首页 > 其他分享 >利用Kafaka发送系统通知(27)

利用Kafaka发送系统通知(27)

时间:2022-10-29 18:14:52浏览次数:51  
标签:27 return int event 发送 Kafaka import public Event

事件驱动的方式

1 需求分析

需求:当用户评论帖子,点赞或关注其他用户时,系统给对应的用户发送通知。
思路:封装好一个通用的事件类,保存事件的发送者,接受者、事件主题(评论、点赞、关注)和事件内容。当用户有评论点赞等行为时,构造事件类。通过Kafaka,生产者将事件发布不同的主题,同时消费者监听不同的主题并作出相应的处理(这里为了简单起见将不同的主题一起监听)。

2 处理事件

  • 封装事件对象
    event.java

set方法返回this,方便连续调用set方法

import java.util.HashMap;
import java.util.Map;

public class Event {

    private String topic;
    // 发送者
    private int userId;
    private int entityType;
    private int entityId;
    // 接收者
    private int entityUserId;
    private Map<String, Object> data = new HashMap<>();

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key, Object value) {
        this.data.put(key, value);
        return this;
    }
}
  • 开发事件的生产者

主动触发

import com.alibaba.fastjson.JSONObject;
import com.nowcoder.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }

}
  • 开发事件的消费者

被动触发

主题定义CommunityConstant.java

public interface CommunityConstant {
    /**
     * 主题: 评论
     */
    String TOPIC_COMMENT = "comment";

    /**
     * 主题: 点赞
     */
    String TOPIC_LIKE = "like";

    /**
     * 主题: 关注
     */
    String TOPIC_FOLLOW = "follow";

    /**
     * 系统用户ID
     */
    int SYSTEM_USER_ID = 1;

}

事件消费者EventConsumer.java

import com.alibaba.fastjson.JSONObject;
import com.nowcoder.community.entity.Event;
import com.nowcoder.community.entity.Message;
import com.nowcoder.community.service.MessageService;
import com.nowcoder.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        // 构造消息内容
        Map<String, Object> content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());

        // 存放额外内容
        if (!event.getData().isEmpty()) {
            for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
                content.put(entry.getKey(), entry.getValue());
            }
        }

        // 将数据存到message表
        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }
}

3 触发事件

  • 评论后,发布通知
  • 点赞后,发布通知
  • 关注后,发布通知

以评论为例CommentController.java

import com.nowcoder.community.entity.Comment;
import com.nowcoder.community.entity.DiscussPost;
import com.nowcoder.community.entity.Event;
import com.nowcoder.community.event.EventProducer;
import com.nowcoder.community.service.CommentService;
import com.nowcoder.community.service.DiscussPostService;
import com.nowcoder.community.util.CommunityConstant;
import com.nowcoder.community.util.HostHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import java.util.Date;

@Controller
@RequestMapping("/comment")
public class CommentController implements CommunityConstant {

    @Autowired
    private CommentService commentService;

    @Autowired
    private HostHolder hostHolder;

    @Autowired
    private EventProducer eventProducer;

    @Autowired
    private DiscussPostService discussPostService;

    @RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        // 触发评论事件
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId", discussPostId);
        if (comment.getEntityType() == ENTITY_TYPE_POST) {
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
        eventProducer.fireEvent(event);

        return "redirect:/discuss/detail/" + discussPostId;
    }
}

4 显示通知

用户进入个人通知页面时,查询该用户的各种通知消息,封装到Map中提供模板使用。具体步骤略。

Message.java

    // ...
    // 查询某个主题下最新的通知
    Message selectLatestNotice(int userId, String topic);

    // 查询某个主题所包含的通知数量
    int selectNoticeCount(int userId, String topic);

    // 查询未读的通知的数量
    int selectNoticeUnreadCount(int userId, String topic);

    // 查询某个主题所包含的通知列表
    List<Message> selectNotices(int userId, String topic, int offset, int limit);

message-mapper.xml

    <select id="selectLatestNotice" resultType="Message">
        select <include refid="selectFields"></include>
        from message
        where id in (
            select max(id) from message
            where status != 2
            and from_id = 1
            and to_id = #{userId}
            and conversation_id = #{topic}
        )
    </select>

    <select id="selectNoticeCount" resultType="int">
        select count(id) from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}
    </select>

    <select id="selectNoticeUnreadCount" resultType="int">
        select count(id) from message
        where status = 0
        and from_id = 1
        and to_id = #{userId}
        <if test="topic!=null">
            and conversation_id = #{topic}
        </if>
    </select>

标签:27,return,int,event,发送,Kafaka,import,public,Event
From: https://www.cnblogs.com/dalelee/p/16838852.html

相关文章

  • 2022/10/27
    至此我每看到那些场景都会回想想起那年的今日有的是在阳光下的倾斜的长坡坡顶有家便利店卖的是散装的用袋子装的啤酒味道有点甜带着回甘最上面有一家教堂很大,从来......
  • react实战笔记27:渲染列表
      数组遍历进行显示 ......
  • java FTP连接时出现“227 Entering Passive Mode”的解决方法
    FTPClientftpClient=newFTPClient();ftpClient.connect(ftpAddress,ftpPort);//连接FTP服务器ftpClient.login(ftpUserName,ftpPassword);//登陆FTP服......
  • Centos 修改SSH端口报错:error: Bind to port 27615 on 0.0.0.0 failed: Permission de
    报错截图问题原因selinux问题解决办法修改selinux中的sshd的端口#安装修改工具$yum-yinstallpolicycoreutils-python#查看selinux中的sshd的端口,输出为......
  • 代码随想录day27 | 39. 组合总和 40.组合总和II 131.分割回文串
    39.组合总和题目|文章思路题目的关键点在于无限制重复选取,那么可以不用去重,只要和sum==target就可以返回。实现点击查看代码classSolution{public:vec......
  • Codeforces Round #827 (Div. 4) A-G
    比赛链接A题解知识点:模拟。时间复杂度\(O(1)\)空间复杂度\(O(1)\)代码#include<bits/stdc++.h>#definelllonglongusingnamespacestd;boolsolve(){......
  • Spring整合Kafaka(二十六)
    1引入依赖<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>2配置Kafka配置server、cons......
  • P8627
    题目描述题目描述很简单,一共\(n\)瓶饮料,每\(3\)瓶饮料的瓶盖可以再换\(1\)瓶新的饮料,问最后一共喝了几瓶饮料?题意分析此题一共有\(3\)种做法,接下来我们一个一个......
  • # 19. 数据类型的转换——27. 程序的流程控制
    19.数据类型的转换——27.程序的流程控制19.数据类型的转换一个类型的值转换为另一个类型的值,所有类型转换必须是显示声明转换后的类型:=要转换的类型(变量)packagema......
  • Kafaka安装与配置(二十五)
    1.简介Kafka是一个分布式的流媒体平台。Kafka可以应用于消息系统、日志收集、用户行为追踪、流式处理等多种场景。Kafka具有高吞吐量、消息持久化、高可靠性、高扩展性......