首页 > 其他分享 >springboot下使用rabbitMQ之开发配置方式(二)

springboot下使用rabbitMQ之开发配置方式(二)

时间:2023-07-18 20:12:11浏览次数:32  
标签:java springboot 配置 springframework listener rabbitMQ rabbit org amqp

springboot下使用rabbitMQ之传参及序列化(二)

消息参数传递在开发中也是个坑,不论使用内置的SimpleMessageConverter还是Jackson2JsonMessageConverter均无法让Consumer接收动态参数

一.序列化的问题

首先贴出具体代码以及测试用例:

  • 消费者
    @RabbitListener(queues = "text.queue")
    @RabbitHandler(isDefault = true)
    public void exec(@Payload Map dto, Message message, Channel channel){
        // 注意,发送的消息类型必须是实现了Serializable接口的类型,消费者接口类型不能随便写!
        LOG.info(RabbitMQCfgEnum.TEXT +"接收到消息:{}", dto);
        // 设置手动确认才会需要执行此
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),Boolean.TRUE);
    }
  • 生产者(测试用例)
    private Connection buidConnection()throws Exception{
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();//MQ采用工厂模式来完成连接的创建
        //2.在工厂对象中设置连接信息(ip,port,virtualhost,username,password)
        factory.setHost("10.156.122.215");//设置MQ安装的服务器ip地址
        factory.setPort(5672);//设置端口号
        factory.setVirtualHost("vhost");//设置虚拟主机名称
        //MQ通过用户来管理
        factory.setUsername("shadow");//设置用户名称
        factory.setPassword("shadow");//设置用户密码
        //3.通过工厂对象获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
    
    @Test
    public void test02()throws Exception{
        Connection connection = this.buidConnection();
        Channel channel = connection.createChannel();
        Map<String,Object> data = new HashMap<>(4);
        data.put("ordeNo", SeqGenUtil.genSeq());
        data.put("timestamp",System.currentTimeMillis());
        String json = JacksonUtil.toJsonString(data);
        channel.basicPublish(RabbitMQCfgEnum.TEXT.exchange, RabbitMQCfgEnum.TEXT.routingKey, null,json.getBytes(StandardCharsets.UTF_8));
        //关闭连接
        channel.close();
        connection.close();
    }

执行测试用例,它居然抛错了:

2023-07-17 14:10:24.037 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.rabbit.retry.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@3ad45d58(byte[55])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=text_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, consumerQueue=text.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(java.util.Map,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@7839ec46]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.util.Map] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=c28511a8-1bbe-9d57-879c-c5763ba40129, amqp_consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, amqp_lastInBatch=false, timestamp=1689574224032}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted
2023-07-17 14:10:24.038 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.r.listener.ConditionalRejectingErrorHandler:170 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
	at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
	... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(java.util.Map,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@7839ec46]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 14 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.util.Map] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=c28511a8-1bbe-9d57-879c-c5763ba40129, amqp_consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, amqp_lastInBatch=false, timestamp=1689574224032}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted

看吧json字符串是无法序列化为Map

标签:java,springboot,配置,springframework,listener,rabbitMQ,rabbit,org,amqp
From: https://www.cnblogs.com/funnyzpc/p/17563988.html

相关文章

  • linux---python虚拟环境配置
    1.安装pipsudoaptinstallpython3-pip-ihttps://pypi.douban.com/simple/2.安装虚拟环境pip3installvirtualenv-ihttps://pypi.douban.com/simple/3.安装Python环境管理工具pip3installvirtualenvwrapper-ihttps://pypi.douban.com/simple/4在当前用户家目......
  • jenkins配置文件
    [root@localhost~]#rpm-qljenkins/etc/init.d/jenkins/etc/logrotate.d/jenkins/etc/sysconfig/jenkins/usr/bin/jenkins/usr/lib/systemd/system/jenkins.service/usr/sbin/rcjenkins/usr/share/java/jenkins.war/usr/share/jenkins/usr/share/jenkins/migrate......
  • Kafka 集群参数配置介绍
    目录Broker端存储信息相关参数ZooKeeper相关参数Topic相关参数数据存留相关参数Topic级别参数保存消息JVM参数操作系统参数文件描述符限制文件系统类型Swappiness提交时间Broker端Broker端参数也被称为静态参数(StaticConfigs),必须在Kafka的配置文件server.properties......
  • centos7 下全局配置最新版的golang语言开发环境
    按照以下步骤进行操作:前往Go官方网站下载页面(https://golang.org/dl/)查找最新版本的Go二进制文件。使用wget命令下载最新版本的Go二进制文件。例如,如果最新版本是1.17,执行以下命令:$wgethttps://golang.org/dl/go1.17.linux-amd64.tar.gz解压缩下载的压缩包:$sudota......
  • netcore模型配置
    模型配置可以通过FluentAPI和注解的方式FluentAPI步骤新建Products和Category类新建Products类ProductspublicclassProduct{publicintId{get;set;}publicstringName{get;set;}publicdecimalPrice{get;set;}......
  • SpringBoot如何解决跨域问题
    什么是跨域跨域问题的本质是浏览器为了保证用户的一种安全拦截机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。跨域三种情况在发起请求时,如果出现了以下情况中的任意一种,那么它就是跨域请求:协议不同,如http和https;域名不同;端口不同。也就是说,即使域名相......
  • @NacosConfigListener注解监听配置变化失效解决
    项目初始配置:maven依赖如下:<!--配置与发现依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency&g......
  • win10小狼毫配置实操笔记
    下载安装进入官方网站下载最新版小狼毫,安装后选择朙(明)月拼音。在配置前阅读官方文档有关用户文件夹和共享文件夹的介绍。default.yaml用来调试全局方案,定制该文件后切换其他比如五笔、双拼等方案时,其定制内容依旧适用。weasel.yaml用来修改rime的常规设置,定制外观。symbol......
  • 面试官:一个 SpringBoot 项目能处理多少请求?(小心有坑)
    你好呀,我是歪歪。这篇文章带大家盘一个读者遇到的面试题哈。根据读者转述,面试官的原问题就是:一个SpringBoot项目能同时处理多少请求?不知道你听到这个问题之后的第一反应是什么。我大概知道他要问的是哪个方向,但是对于这种只有一句话的面试题,我的第一反应是:会不会有坑?所以并......
  • H3C交换机SSH登录配置截图
    使用console线连接电脑和设备,登录设备界面。首先进行第一步:创建用户创建用户后,然后设置vty允许的管理方式:最后一步:开启ssh功能......