SpringBoot整合 Canal+RabbitMQ 实现监听 MySQL 数据库同步更新 Redis 缓存,编写RabbitMQ 消费端监听同步缓存。
接收消息是字符串返回的是字节数据,eg:
-30,-128,-100,-25,-126,-71,-27,-81,-71,-25,-126,-71,-30,-128,-99
使用了jackson的converter后,报了如下的异常:
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found from actual payload type 'byte[]' to expected payload type 'com.lynch.entity.CanalMessage' Could not convert incoming message with content-type [null]。
解决办法:
重写Jackson2JsonMessageConverter的fromMessage方法,并将contentType强制为application/json。
import org.springframework.amqp.core.Message; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; public class JacksonMessageConverter extends Jackson2JsonMessageConverter { public JacksonMessageConverter() { super(); } @Override public Object fromMessage(Message message) { message.getMessageProperties().setContentType("application/json"); return super.fromMessage(message); } }
RabbitConfig类配置重新的Jackson2JsonMessageConverter类
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { // @Value("${spring.rabbitmq.listener.order.exchange.name}") // public String orderExchangeName; /** * 消息序列化配置 */ @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new JacksonMessageConverter()); return factory; } }
消息异常:
2023-05-29 15:39:01.957 WARN 10204 --- [ntContainer#0-4] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.lynch.rabbitmq.springboot.CanalListener.onMessage(org.springframework.messaging.Message<com.lynch.entity.CanalMessage>,com.rabbitmq.client.Channel) throws java.lang.Exception] Bean [com.lynch.rabbitmq.springboot.CanalListener@63fa752b] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:199) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201] Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found from actual payload type 'byte[]' to expected payload type 'com.lynch.entity.CanalMessage' at org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.convertPayload(MessageMethodArgumentResolver.java:135) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.resolveArgument(MessageMethodArgumentResolver.java:93) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] ... 12 common frames omitted 2023-05-29 15:39:01.957 WARN 10204 --- [ntContainer#0-2] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@722320c8(byte[1333])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal.routing.key, deliveryTag=6, consumerTag=amq.ctag-sd6w4IXfXBOIvvBpGxcsBA, consumerQueue=canal.queue]) 2023-05-29 15:39:01.958 WARN 10204 --- [ntContainer#0-4] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@79287307(byte[1330])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal.routing.key, deliveryTag=57, consumerTag=amq.ctag-CyFlV748Nc7FHyq3Rz7M6w, consumerQueue=canal.queue]) 2023-05-29 15:39:01.958 ERROR 10204 --- [ntContainer#0-4] o.s.a.r.l.SimpleMessageListenerContainer : Execution of Rabbit message listener failed, and the error handler threw an exception org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:105) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1368) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1621) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1414) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201] Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.lynch.rabbitmq.springboot.CanalListener.onMessage(org.springframework.messaging.Message<com.lynch.entity.CanalMessage>,com.rabbitmq.client.Channel) throws java.lang.Exception] Bean [com.lynch.rabbitmq.springboot.CanalListener@63fa752b] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:199) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] ... 6 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found from actual payload type 'byte[]' to expected payload type 'com.lynch.entity.CanalMessage' at org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.convertPayload(MessageMethodArgumentResolver.java:135) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.resolveArgument(MessageMethodArgumentResolver.java:93) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] ... 12 common frames omitted 2023-05-29 15:39:01.958 ERROR 10204 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Execution of Rabbit message listener failed, and the error handler threw an exception org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:105) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1368) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1621) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1414) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201] Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.lynch.rabbitmq.springboot.CanalListener.onMessage(org.springframework.messaging.Message<com.lynch.entity.CanalMessage>,com.rabbitmq.client.Channel) throws java.lang.Exception] Bean [com.lynch.rabbitmq.springboot.CanalListener@63fa752b] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:199) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) [spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] ... 6 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found from actual payload type 'byte[]' to expected payload type 'com.lynch.entity.CanalMessage' at org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.convertPayload(MessageMethodArgumentResolver.java:135) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.resolveArgument(MessageMethodArgumentResolver.java:93) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.9.RELEASE.jar:5.1.9.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.8.RELEASE.jar:2.1.8.RELEASE] ... 12 common frames omitted 2023-05-29 15:39:12.866 WARN 10204 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
标签:canal,convert,incoming,java,springframework,rabbit,RELEASE,org,2.1 From: https://www.cnblogs.com/linjiqin/p/17440734.html