首页 > 其他分享 >ruoyi整合ActiveMQ-Stomp

ruoyi整合ActiveMQ-Stomp

时间:2023-04-21 19:35:23浏览次数:27  
标签:LOG ActiveMQ Stomp ruoyi stompConnection org import activemq String

https://www.cnblogs.com/SjhCode/p/ruoyiActiveMQ.html

ActiveMQ 在若依中的配置,这里使用的传输协议是stomp协议

消费时第一时间要确定对方打开了端口,是可以连接的状态。(cmd命令行测试telnet, ping)
ActiveMQ
消费消息consumeMsg:在处理业务时,需要判断JSON里面每一条数据有没有存在,当遇到不存在的JSON,需要抛出异常或者跳过本次,及时结束消费,以免造成死循环。
需要换多几条json测试,以免消费不成功

本地如何测试:

https://activemq.apache.org/download.html    ;下载ActiveMQ 

在\apache-activemq-5.xx.x\bin\win64 ,启动activemq.bat,不能关闭窗口

浏览器访问http://127.0.0.1:8161/  ,用户名admin ,密码admin

创建Queue ,Queue Name就是通道名称,(下面代码中,QUEUE_FLIGHT_ASSIGNPARK=“”; 需要和配合这里修改)

然后就是写完java代码之后启动项目,通过网站的对应Queue, 点击sent to;输入Message body;往通道里面放消息;

如果此时java代码没报错,并且打印出对应的log,说明就消费成功了,网站上会显示消息已经被消费了,待消费数量为0,消费者数量为1,消息出队+1;

接入测试:

cmd命令行测试telnet

对方提前往队列里面加消息

根据三方提供的host,port,user,password,queuename对应修改,连上后会消费到消息则对接成功

 

导入ActiveMQ需要的jar包

 

        <!--ActiveMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.5</version>
        </dependency>    
 

配置文件

 
#activemq
  activemq:
    host: 127.0.0.1
    port: 61613
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 10
#  #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
#  pub-sub-domain: true
 

生产者

 
package com.ruoyi.web.controller.ActiveMQ;

import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;


import java.util.HashMap;
import java.util.UUID;
//生产者
@Component
public class JmsProducer {
    @Autowired
    JmsConsumer jmsConsumer;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${spring.activemq.host}")
    private String host;

    @Value("${spring.activemq.port}")
    private Integer port;



    //连接
    public StompConnection connectionFactory() throws Exception {
        StompConnection conn = new StompConnection();
        conn.open(host, port);
        conn.connect(userName, password);
        return conn;
    }

    /**
     * 发送JMS消息
     *
     * @throws Exception exception
     */
    public void sendMessage(String queueName, String message)
        throws Exception {
        StompConnection stompConnection = connectionFactory();
        String tx = UUID.randomUUID().toString().replaceAll("-", "");
        HashMap<String, String> headers = new HashMap<>();
        headers.put(Stomp.Headers.Send.PERSISTENT, "true");
        stompConnection.begin(tx);
        stompConnection.send(queueName, message, tx, headers);
        stompConnection.commit(tx);
        stompConnection.disconnect();
    }


}
 

消费者

QUEUE_FLIGHT_ASSIGNPARK 是需要订阅通道名
package com.ruoyi.web.controller.ActiveMQ;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.admin.domain.entity.Dto.BullyingDto;
import com.ruoyi.admin.service.WarningService;
import com.ruoyi.common.websocket.WebSocket;
import com.ruoyi.web.controller.ActiveMQ.JmsProducer;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;


import javax.annotation.Resource;
//消费者
@Component
public class JmsConsumer implements ApplicationListener<ContextRefreshedEvent> {
    private static Logger LOG = LoggerFactory.getLogger(JmsConsumer.class);
    @Resource
    public JmsProducer jmsProducer;
    @Resource
    private WebSocket webSocket;
    @Autowired
    private WarningService warningService;

    private static final String QUEUE_FLIGHT_ASSIGNPARK = "test001";

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            consumeMsg(QUEUE_FLIGHT_ASSIGNPARK);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }
    /**
     * 从mq中消费消息
     *
     * @param queueName
     * @throws Exception
     */

    public void consumeMsg(String queueName) throws Exception {
        LOG.info("*********从消息队列中获取结果、并推送给前端**************");
        new Thread(() -> {
            while (true) {
                StompFrame frame;
                String messageId = "";
                StompConnection stompConnection = null;
                try {
                    String ack = "client";
                    stompConnection = jmsProducer.connectionFactory();
                    stompConnection.subscribe(queueName, ack);
                    stompConnection.keepAlive();
                    // 注意,如果没有接收到消息,
                    // 这个消费者线程会停在这里,直到本次等待超时
                    long waitTimeOut = 30000;
                    frame = stompConnection.receive(waitTimeOut);
                    Map<String, String> headers = frame.getHeaders();
                    messageId = headers.get("message-id");
                    LOG.info("消息id:{}", messageId);
                    //具体的业务处理......
                    //取出推送信息中的msg
            String msg = frame.getBody(); LOG.info(msg);
            // 在ack是client标记的情况下,确认消息

            if ("client".equals(ack)) {
                      stompConnection.ack(messageId);
                      LOG.info("确认消息");
                    }
                } catch (SocketTimeoutException e) {
                    LOG.error(e.getMessage());
                    continue;
                } catch (JSONException ex) {
                    LOG.error("message_id:{},数据异常:{}", messageId, ex.getMessage());
                    continue;
                } catch (Exception e) {
                    LOG.error(e.getMessage());
                    continue;
                } finally {
                    try {
                        stompConnection.ack(messageId);
//                        LOG.info();
                        stompConnection.disconnect();
                    } catch (Exception e) {
                        LOG.error(e.getMessage());
                    }
                }
            }
        }).start();
    }
}

 

 

 

标签:LOG,ActiveMQ,Stomp,ruoyi,stompConnection,org,import,activemq,String
From: https://www.cnblogs.com/chuangsi/p/17341499.html

相关文章

  • ruoyi整合mqtt
    https://www.cnblogs.com/SjhCode/p/mqtt.htmlruoyi整合mqttmqtt报错客户机未连接32104,可能是没连接上,也可能是两个客户端clientID相同,也可能是同一台机子subscribe(Topic,Qos)订阅了多次在消费时,需要对方的通道有发送测试信息,我们才能取出来消费,消费完出队。 本地测试:https......
  • ruoyi整合WebSocket
    https://www.cnblogs.com/SjhCode/p/WebSocket.html ruoyi整合WebSocket这里使用WebSocket目的:向前端推送实时消息,配合ActiveMQ接入三方使用的导入maven依赖 <!--WebSocket--><dependency><groupId>org.java-websocket</groupId><......
  • 若依RuoYi框架浅析 基础篇①——日志logs本地保存
    文章目录日志保存位置在/home/ruoyi/logs/[root@iZ2ze30dygwd6yh7gu6lskZlogs]#cd/home/ruoyi/logs/[root@iZ2ze30dygwd6yh7gu6lskZlogs]#lssys-error.2021-02-28.logsys-error.logsys-info.2021-02-28.logsys-info.2021-03-01.logsys-info.logsys-user.2021-0......
  • ztree初始化时选中(ruoyi版)
    ruoyi版本:4.6.0问题描述将后台传入的参数放到$.tree中,当ztree的Node中checked为true时,Node默认为选中,目前前台调用代码varurl=ctx+"获得List<Ztree>的URL";varoptions={url:url,expandLevel:2,beforeClick:function(treeId,treeNode,clickFlag){......
  • Kubuesphere部署Ruoyi(三):持久化存储配置
    按照如下教程配置NFS先服务器:https://kubesphere.io/zh/docs/v3.3/reference/storage-system-installation/nfs-server/后客户端:https://kubesphere.io/zh/docs/v3.3/installing-on-linux/persistent-storage-configurations/install-nfs-client/按照链接操作以后,在客户端上......
  • Kubuesphere部署Ruoyi(二):部署kubesphere
    先决条件:更换DNS更换apt的镜像源Ubuntu下永久性修改DNSvi/etc/systemd/resolved.confDNS字段取消注释,并修改DNS为223.5.5.5223.5.5.5是一个IP地址,是AlibabaCloud提供的免费DNS服务器的IP地址。修改后保存。systemctlrestartsystemd-resolved清华镜像源https://m......
  • 玩转RuoYi-Cloud-Plus-3.Docker 搭建 MySQL8.0
    3.Docker搭建MySQL8.0 1、docker仓库搜索mysqldockersearchmysql2、docker仓库拉取mysql8.0dockerpullmysql:8.0备注:dockerpullmysql//默认拉取最新版本3、查看本地仓库镜像是否下载成功dockerimagesmysql:8.04、安装运行mysql8.0......
  • 玩转RuoYi-Cloud-Plus--6.搭建高可用Nacos集群
    Nacos高可用「集群部署」注册中心,在微服务中是核心基础组件,当然要保证服务高可用,避免单节点故障。官方部署文档:https://nacos.io/zh-cn/docs/cluster-mode-quick-start.html由于资源有限,在一台windows机器上,启动三个端口不同的节点测试。解压安装后,复制出三个节点文件夹 ......
  • 玩转RuoYi-Cloud-Plus-2.安装Docker-ce
    2.安装Docker-ce一、删除之前安装的docker(若之前未安装过,此步骤省略…)进入centos根目录执行以下命令(\是linux系统种命令换行符,如果命令过长,可以用\来换行)yumremovedocker\docker-client\docker-client-latest\docker-common\docker-latest\docker-latest-logr......
  • ava: 程序包com.alibaba.nacos.api.common不存在_RuoYi-Cloud-Plus-master_jar包不存
    来看看原因吧,jar包是存在的,但是就是在idea中引用不到,来看看怎么回事: 原来就是这个包找不到,但是从下面看是有的: 但是注意,这里的com.alibaba.nacos.api...原来可不是这样的,这个是我后来修改过的,原来是只有com.alibaba.nacos.common,而引用的是com.alibaba.nacos.api.commo......