首页 > 其他分享 >向mq写消息

向mq写消息

时间:2023-12-15 19:22:50浏览次数:24  
标签:producer mqLog MQLog mq 消息 org import public

1.基础版本

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import com.alibaba.fastjson.JSON;

public class MQProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("G-Group_REQ");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String jsonStr = JSON.toJSONString("Your JSON content");
        Message msg = new Message("T-Topic", jsonStr.getBytes());

        producer.send(msg);
        producer.shutdown();
    }
}

2.添加写消息失败处理:登记日志、定时处理、参数配置化

import cn.com.*.support.MQLogSupport;
import cn.com.*.entity.MQLog;
import cn.com.*.repository.MQLogRepository;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

@Component
public class MQLogSupportImpl implements MQLogSupport{
    private final Logger logger = LoggerFactory.getLogger(MQLogSupport.class);
    @Autowired
    private MQLogRepository mqLogRepository;

    @Value("${rocketmq.name-server}")
    String namesrvAddr;

    // 写消息服务
    public void sendMessageNoTag(MQLog mqLog) {
        try {
            DefaultMQProducer producer = new DefaultMQProducer(mqLog.getMqGroup());
            producer.setNamesrvAddr(namesrvAddr);
            // 设置发送消息的超时时间为30秒
            producer.setSendMsgTimeout(30000);
            producer.start();
            Message msg = new Message(mqLog.getMqTopic(), mqLog.getMqMessage().getBytes());
            producer.send(msg);
            setMQlogSuccess(mqLog);
            producer.shutdown();
        } catch (Exception e) {
            setMQlogError(mqLog, "9999", "初次:" + e.getMessage());
        }
    }

    // 定时任务,失败后重新写消息,每分钟执行一次
    @Scheduled(cron = "0 0/1 * * * ?")
    @SchedulerLock(name = "RocketMQ", lockAtLeastForString = "PT5M", lockAtMostForString = "PT30M")
    public void readAndResendMessage() {
        // 读取失败信息
        List<MQLog> messageList = mqLogRepository.findByDealCodeNot("0000");
        if(CollectionUtils.isEmpty(messageList)){
            return;
        }
        // 重新写消息
        for (MQLog message : messageList) {
            try {
                DefaultMQProducer producer = new DefaultMQProducer(message.getMqGroup());
                producer.setNamesrvAddr(namesrvAddr);
                producer.setSendMsgTimeout(30000);
                producer.start();
                Message msg = new Message(message.getMqTopic(), message.getMqMessage().getBytes());
                producer.send(msg);
                setMQlogSuccess(message);
                producer.shutdown();
            } catch (Exception e) {
                setMQlogError(message, "定时任务:" + e.getMessage());
            }
        }
    }

    public MQLog regMQlog(JSONObject mqLogJson){
        logger.debug("登记 mq 消息......");
        String topic = mqLogJson.getString("topic");
        String tags = mqLogJson.getString("tags");
        String msg = JSON.toJSONString(mqLogJson);
        String key = mqLogJson.getString("key");
        Boolean approveResult = mqLogJson.getBoolean("approved");
        String approver = mqLogJson.getString("approver");
        String comment = mqLogJson.getString("comment");
        String finshTime = mqLogJson.getString("finshTime");

        MQLog.MQLogBuilder mqLogBuilder = MQLog.builder();
        mqLogBuilder.mqTopic(topic)
                .mqTags(tags)
                .key(key)
                .mqMessage(msg)
                .approveResult(approveResult)
                .approver(approver)
                .approveComment(comment)
                .approveFinishTime(finshTime)
                .regDateTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));

        MQLog savedMqLog = mqLogRepository.save(mqLogBuilder.build());
        logger.debug("登记 mq 消息完成。");
        return savedMqLog;
    }

    public void setMQlogError(MQLog mqLog, String errorCode, String errorMsg){
        mqLogRepository.save(mqLog.toBuilder().dealCode(errorCode).dealMsg(errorMsg).build());
    }

    public void setMQlogError(MQLog mqLog, String errorMsg){
        setMQlogError(mqLog,"Error",errorMsg);
    }

    public void setMQlogSuccess(MQLog mqLog){
        mqLogRepository.save(mqLog.toBuilder().dealCode("0000").dealMsg("success").build());
    }
}

3.简化代码

import cn.com.*.support.MQLogSupport;
import cn.com.*.entity.MQLog;
import cn.com.*.repository.MQLogRepository;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

@Component
public class MQLogSupportImpl implements MQLogSupport {
    private final Logger logger = LoggerFactory.getLogger(MQLogSupport.class);
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private MQLogRepository mqLogRepository;

    // 写消息服务
    public void sendMessageNoTag(MQLog mqLog) {
        rocketMQTemplate.asyncSend(mqLog.getMqTopic(), mqLog.getMqMessage(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                setMQlogSuccess(mqLog);
            }

            @Override
            public void onException(Throwable e) {
                setMQlogError(mqLog, "初次:" + e.getMessage());
                retrySendMessage(mqLog);
            }
        });
    }

    // 重试发送消息
    private void retrySendMessage(MQLog mqLog) {
        int retryCount = mqLog.getRetryCount();
        if (retryCount < 3) { // 最多重试3次
            mqLog.setRetryCount(retryCount + 1);
            mqLogRepository.save(mqLog);
            sendMessageNoTag(mqLog);
        } else {
            setMQlogError(mqLog, "重试3次仍然失败");
        }
    }

    // 定时任务,失败后重新写消息,每分钟执行一次
    @Scheduled(cron = "0 0/1 * * * ?")
    @SchedulerLock(name = "RocketMQ", lockAtLeastForString = "PT5M", lockAtMostForString = "PT30M")
    public void readAndResendMessage() {
        // 读取失败信息
        List<MQLog> messageList = mqLogRepository.findByDealCodeNot("0000");
        if (CollectionUtils.isEmpty(messageList)) {
            return;
        }
        // 重新写消息
        for (MQLog message : messageList) {
            rocketMQTemplate.asyncSend(message.getMqTopic(), message.getMqMessage(), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    setMQlogSuccess(message);
                }

                @Override
                public void onException(Throwable e) {
                    setMQlogError(message, "定时任务:" + e.getMessage());
                    retrySendMessage(message);
                }
            });
        }
    }

    // 设置消息发送成功
    private void setMQlogSuccess(MQLog mqLog) {
        mqLog.setDealCode("0000");
        mqLog.setDealDesc("发送成功");
        mqLogRepository.save(mqLog);
    }

    // 设置消息发送失败
    private void setMQlogError(MQLog mqLog, String errorMsg) {
        mqLog.setDealCode("9999");
        mqLog.setDealDesc(errorMsg);
        mqLogRepository.save(mqLog);
    }
}

注意:

1) 配置文件

rocketmq:
  name-server: IP:端口
  producer:
    group: "G-Group_REQ"
    send-msg-timeout: 30000

2) 根据需要实现:

MQLogSupport、MQLog、MQLogRepository

标签:producer,mqLog,MQLog,mq,消息,org,import,public
From: https://www.cnblogs.com/ZhaoHS/p/17833168.html

相关文章

  • RabbltMQ Management使用
    说明,图是网上找的,仅供学习参考安装好RabbitMQ,开启rabbitmq_management插件并重启RabbitMQ服务后,可以访问控制台#启动后台管理rabbitmq-pluginsenablerabbitmq_management重启RabbitMQ服务,然后浏览器访问http://localhost:15672/账号:xxx 密码:xxx登录后的首页Overvie......
  • Kafka 分布式消息系统
    文章目录消息中间件对比Kafka概述kafka安装和配置kafka入门生产者发送消息消费者接收消息Kafka高可用设计集群备份机制(Replication)备份机制(Replication)-同步方式kafka生产者详解同步发送异步发送参数详解(ack)参数详解(retries)参数详解-消息压缩kafka消费者详解消费者组消息有......
  • 两个线程共享一个套接字,其中一个线程使用sendmsg函数不断发送消息到该套接字,另一个线
    以下是使用C语言写的一段代码,实现两个线程共享一个套接字,其中一个线程使用sendmsg函数不断发送消息到该套接字,另一个线程使用recvmsg函数不断接收该套接字的消息,并打印出来的功能点击查看代码#include<stdio.h>#include<stdlib.h>#include<string.h>#include<unistd.......
  • 消息通知(Notification)/用户触达系统设计
    近年来,通知功能已经成为许多应用程序中突出的特性。构建一个能每天发送数百万通知的可扩展系统绝非易事。这正是为什么我觉得有必要记录我在这方面踩坑之路。也叫用户触达系统。完成这项任务要求对通知生态系统有深刻的理解,否则需求很容易变得模糊和不明确。1了解通知系统并确......
  • 第六章 消息认证和哈希函数 —— 现代密码学(杨波)复习题
    第六章一、填空1.通信双方A和B通信,则可能发生哪两种形式的抵赖或欺骗?2.数字签名能够抵抗不可否认性攻击的原因是________________________________3.基于公钥加密的数字签名方式中,加密的消息应该是________________________4.直接方式的数字签名的公共弱点是______________......
  • FolkMQ 内存型消息中间件,v1.0.21 发布
    简介采用“多路复用”+"内存运行"+"快照持久化"+"Broker集群模式"(可选)+基于Socket.D网络应用协议开发。全新设计,自主架构!角色功能生产端发布消息(Qos0、Qos1)、发布定时消息(Qos0、Qos1)、发布重试消费端订阅、取消订阅消费端消费-ACK(自动、手......
  • 记录rabbitMQ的广播队列的错误使用导致未能正确广播的问题
    背景说明:有3个服务S1、S2、S3现在服务S1需要发布消息到广播交换机E,并建立了两个普通队列Q1,Q2,将其绑定到广播交换机E上服务S2和服务S3同时监听队列Q1,Q2本意是,服务S1通过广播交换机E把消息同时推送给服务S2和S3后面测试时,同事发现,服务S2和服务S3都只接收到了部分消息,而不是全......
  • 5G NR RRC协议解析—NR系统消息
    5GNRRRC协议解析—NR系统消息来源  https://zhuanlan.zhihu.com/p/606227190  01系统消息是什么?系统消息是由基站周期性地在下行链路的广播信息,其内容包含了该基站的基础配置信息、空口(Uu接口)协议层的关键参数等。因此,系统信息对于UE而言非常重要。对于任何移动通信系......
  • 清空ActiveMQ中的Scheduled延时队列
    要清空ActiveMQ中的Scheduled延时队列,可以执行以下步骤:停止ActiveMQ服务器。在ActiveMQ数据存储目录中找到存储延时消息的目录。该目录的默认位置是<activemq_home>/data/localhost/Scheduled.删除该目录下的所有文件,这将清空延时队列中的消息。启动ActiveMQ服务器。请注意......
  • Java中的消息队列(MQ)应用实践
    摘要:本文将介绍Java中消息队列(MQ)的概念、应用场景以及如何使用Java中的消息队列进行实践。我们将探讨如何使用Java消息队列实现异步通信、解耦和流量削峰等常见需求,并通过实际案例展示其应用。一、引言在分布式系统中,消息队列(MQ)是一种常见的中间件技术,用于实现异步通信和解耦。通过......