首页 > 其他分享 >排查MQ消息发送和接收

排查MQ消息发送和接收

时间:2024-12-09 18:58:44浏览次数:3  
标签:connectionFactory return String 发送 排查 MQ new public defaultCharset

排查MQ消息发送和接收

TemplateCodeSmsMq mq = new TemplateCodeSmsMq();
        mq.setMobile(record.getMobile());
        mq.setTemplateCode("mySmsCode1");
        Map<String, Object> map = new HashMap<>();
        map.put("plateNum", "1111");
        String url = "短链地址";
        map.put("url", getShortUrl(url));
        mq.setParams(map);
//        String sendMsg = new Gson().toJson(mq);
//        log.info("发送json1=" + JSON.toJSONString(mq));
//        log.info("发送json2=" + sendMsg);

        //原因是:这个地方是对象mq,而不是json字符串。
        rabbitTemplate.convertAndSend("test-exchange", "templateCode.test", mq);

报错日志:

Caused by: com.alibaba.fastjson.JSONException: can not cast to JSONObject.
at com.alibaba.fastjson.JSON.parseObject(JSON.java:260)
at com.common.FastJsonMessageConverter.fromMessage(FastJsonMessageConverter.java:44)

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.alibaba.fastjson.JSONObject
at com.alibaba.fastjson.JSON.parseObject(JSON.java:258)
... 17 common frames omitted

@Configuration
@Data
public class RabbitMQConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("10")
    private String prefetchCount;

    public static final String DEFAULT_EXCHANGE = "default-order-exchange";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate orderSmsRabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new FastJsonMessageConverter();
    }

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange(RabbitMQConfig.DEFAULT_EXCHANGE, true, false);
        return directExchange;
    }

    @Bean
    @Primary
    public SimpleRabbitListenerContainerFactory orderRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        factory.setConcurrentConsumers(Integer.valueOf(prefetchCount));
        return factory;
    }

}



import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import java.io.UnsupportedEncodingException;
    
public class FastJsonMessageConverter extends AbstractMessageConverter {    
     private static Logger log = LoggerFactory.getLogger(FastJsonMessageConverter.class);

    public static final String DEFAULT_CHARSET = "UTF-8";

    private volatile String defaultCharset = DEFAULT_CHARSET;

    public FastJsonMessageConverter() {
        super();
    }

    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }

    @Override
    public Object fromMessage(Message message)
            throws MessageConversionException {
        String json = "";
        try {
            json = new String(message.getBody(), defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return JSONObject.parseObject(json);
    }

    @SuppressWarnings("unchecked")
    public <T> T fromMessage(Message message, T t) {
        String json = "";
        try {
            json = new String(message.getBody(), defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return (T) JSONObject.parseObject(json, t.getClass());
    }

    @Override
    protected Message createMessage(Object objectToConvert,
                                    MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = JSONObject.toJSONString(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);
    }
}

 

标签:connectionFactory,return,String,发送,排查,MQ,new,public,defaultCharset
From: https://www.cnblogs.com/oktokeep/p/18595837

相关文章

  • docker ps 命令查看容器ports字段为空排查
    目录1.问题描述2.排查过程2.1确认api-server容器端口是64432.2确认windows的6443是不是docker暴漏出来2.3 执行docker inspect命名,继续观察3.问题总结4.问题反思1.问题描述:windowsDockerdestop启用了kubernetes,k8s api-server http://127.0.0.1:6......
  • 接口超时日志排查分析-BeanUtils对象复制6秒及类型不一致复制异常,复制null属性被覆盖
    接口超时日志排查分析-BeanUtils对象复制6秒及类型不一致复制异常,复制null属性被覆盖解决,常见Bean拷贝框架的性能对比1.接口超时日志排查分析-BeanUtils对象复制6秒1.查询日志命令,分析接口的请求及响应的时长catproJectDock.log|grep-E"请求开始时间|请求正常消耗时间">>......
  • 排查 Pod 状态异常
    TerminatingPendingContainerCreating/WaitingCrashLoopBackOffImagePullBackOffTerminating有时候删除Pod一直卡在Terminating状态,一直删不掉,可以从以下方面进行排查。分析思路一、首先我们先了解下pod的删除流程:APIServer收到删除Pod的请求,Pod被标记删除,......
  • Linux主机安全入侵排查步骤
    1导语经常有用户报障系统被植入恶意程序,如挖矿软件、ddos攻击病毒、syn映射攻击病毒等,可以按照以下流程为用户排查入侵病毒类型:一、定位病毒进程对于用户反馈云主机性能卡顿,CPU和内存占用较高的情况:执行TOP命令,查看占用CPU较高的异常进程,一般多为80%以上,有个别病毒占......
  • 【 Kubernetes 集群】Pod 网络无法访问排查处理
    本文档介绍TKE集群中多场景下可能发生的常见网络问题,并给出对应的排查思路。当遇到此类问题时,建议您首先按照下文中的检查建议进行排查,若确认检查项无误后仍不能正常访问,请您联系我们寻求帮助。集群中不同节点上的容器(Pod)无法互访同一集群中不同节点上的Pod可以直接互......
  • solon 集成 rocketmq5 sdk
    使用rocketmq5是比较简单的事情。也有些同学对sdk原始接口会陌生,会希望有个集成的示例。<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>${rocketmq5.version}</version></depen......
  • 切换到MySQL数据库后,如果遇到连接错误,应该怎样排查?
    在切换到MySQL数据库后,如果遇到连接错误,可以按照以下步骤进行排查和解决:检查数据库驱动类型:确保在database.php文件中正确设置了数据库驱动类型为mysql。PbootCMS支持多种数据库驱动,如果设置不正确,可能会导致连接失败。验证数据库服务器地址:确认数据库服务器地址是否正确。......
  • MySQL版CPU使用率高的原因排查和解决方法
    使用关系数据库MySQL版时,如果您的CPU使用率很高或接近100%,会导致数据读写处理缓慢、连接缓慢、删除出现报错等,从而影响业务正常运行。问题原因CPU使用率高由多种原因导致,最常见的几种原因如下:1.慢sql:执行时间较长的SQL查询可能会占用大量的CPU资源,尤其是当查询涉及复杂......
  • php无法正常修改网站,如何排查和解决PHP网站修改问题
    如果您在修改PHP网站时遇到问题,可以按照以下步骤排查和解决问题:检查错误日志:查看服务器的错误日志文件,寻找具体的错误信息。这有助于确定问题的根本原因。确认文件权限:确保网站文件和目录的权限设置正确。通常,文件权限应设置为644,目录权限应设置为755。检查PHP版本:确保您的服......
  • 商业版vs开源版:一图看懂云消息队列 RocketMQ 版核心优势
    十年磨砺,应“云”而生云消息队列RocketMQ版是阿里云基于ApacheRocketMQ构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。自2012年诞生于阿里巴巴集团的核心交易链路以来,RocketMQ经历了多次“双十一”的万亿级数据洪峰验证。2015年,阿里......