首页 > 其他分享 >rabbitmq fanout(广播)模式

rabbitmq fanout(广播)模式

时间:2022-10-26 15:33:30浏览次数:54  
标签:String rabbitMQ Value factory private 广播 fanout rabbitmq channel

配置信息

bbitMQ:
  host: rabbitmq.com
  port: 0000
  username: 0000
  password: 0000
  topic: amq.fanout
  queueName: icost-beta
  virtualHost: icost-beta
  #开启发送确认机制,将来消息到达交换机以后有一个回调
  publisher-confirm-type: correlated
  #消息到达消息队列回调(如果消息没有成功到达队列,会触发回调方法)
  publisher-returns: true
  template:
    retry:
      enabled: true  # 开启重发机制
      initial-interval: 1000ms #间隔 1秒
      max-attempts: 6    #最多发6次
      multiplier: 1.2 #每次间隔 时间*1.2
      max-interval: 10000ms  #每次最大间隔时间
  listener:
    simple:
      acknowledge-mode: manual

生产者

    @Value("${rabbitMQ.username}")
    private String username;

    @Value("${rabbitMQ.password}")
    private String password;

    @Value("${rabbitMQ.host}")
    private String host;

    @Value("${rabbitMQ.port}")
    private int port;

    @Value("${rabbitMQ.topic}")
    private String topic;

    //声明队列
    @Value("${rabbitMQ.queueName}")
    private String  queueName;

    @Value("${rabbitMQ.virtualHost}")
    private String  virtualHost;
//调用方法
private void notifyMQProcessComplete(String businessKey, String domain, String project, String instanceId , int result) {
MQMessage message = new MQMessage();
message.setBusinessKey(businessKey);
message.setDomainId(project);
message.setDomainType(domain);
message.setInstanceId(instanceId);
message.setStatus(result);

try {
sendProducer(message);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
     * 发布信息
     */
    private void sendProducer (MQMessage mqMessage)throws IOException, TimeoutException{
        Gson gson = new Gson();
        // 发送同步消息
        String messageStr = gson.toJson(mqMessage);
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        //设置 RabbitMQ 地址
        factory.setHost(host);
        factory.setPort(port);
        factory.setVirtualHost(virtualHost);
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        Channel channel = conn.createChannel();
        //声明交换机(参数为:交换机名称; 交换机类型,广播模式)
        channel.exchangeDeclare(topic, BuiltinExchangeType.FANOUT,true);
//消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
        channel.basicPublish(topic, "", true,null,messageStr.getBytes(StandardCharsets.UTF_8));
        channel.close();
        conn.close();

    }

消费者

/**
 * 监听
 */
@Component
public class RocketConsumerListener implements CommandLineRunner {

    @Autowired
    private RocketMQConsumer rocketMQConsumer;
    protected final Logger logger = LoggerFactory.getLogger(RocketConsumerListener.class);
    
    @Override
    public void run(String... args) throws IOException, TimeoutException {
        logger.info("========消费者启动==========");
        rocketMQConsumer.normalSubscribe();
    }
}
@Component
@Slf4j
public class RocketMQConsumer {
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    BidProcessMessageHandler handler;

    @Value("${rabbitMQ.username}")
    private String username;

    @Value("${rabbitMQ.password}")
    private String password;

    @Value("${rabbitMQ.host}")
    private String host;

    @Value("${rabbitMQ.port}")
    private int port;


    @Value("${rabbitMQ.topic}")
    private String topic;

    @Value("${rabbitMQ.queueName}")
    private String  queueName;

    @Value("${rabbitMQ.virtualHost}")
    private String  virtualHost;



    /**
     * 订阅消息,处理业务
     */
    public void normalSubscribe() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        //设置 RabbitMQ 地址
        factory.setHost(host);
        factory.setPort(port);
        factory.setVirtualHost(virtualHost);

        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        final Channel channel = conn.createChannel();
        //声明交换机
        channel.exchangeDeclare(topic, ExchangeTypes.FANOUT,true);
        //绑定临时队列与交换机并设置获取交换机中动态路由
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue, topic, "");
        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //接收到的消息内容
                    String msg = new String(body);
                    MQWorkflowMessageBody messageBody = objectMapper.readValue(msg, MQWorkflowMessageBody.class);
                    if ("bid".equals(messageBody.getDomainType()) && StringUtils.isNotBlank(messageBody.getBusinessKey()) && StringUtils.isNotBlank(String.valueOf(messageBody.getStatus()))) {
                        log.info("bid收到消息messageID:" + " msg:" + msg);
                        handler.processRequest(messageBody.getBusinessKey(), String.valueOf(messageBody.getStatus()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        });

    }

 

标签:String,rabbitMQ,Value,factory,private,广播,fanout,rabbitmq,channel
From: https://www.cnblogs.com/dahei96/p/16828565.html

相关文章

  • Java实现rabbitMQ向MQ里推送消息
    配置文件application.properties里配置rabbitMQ的ip和端口,如果没有配置username和password,就是mq安装时默认的用户名密码:spring.rabbitmq.host=127.0.0.1spring.rabbitm......
  • 安卓广播常用代码介绍1——广播的介绍
    首先声明一下,水平有限,说明错误,请多多指正,谢谢。广播作用接受或者发出广播,可以使应用程序得到自己关心的广播内容(如系统的开关机,WiFi开关机,或者其他应用程序的广播内容),或者发......
  • 消息队列之RabbitMQ介绍与运用
    RabbitMQ说明本章,我们主要从RabbitMQ简介、RabbitMQ安装、RabbitMQ常用命令、RabbitMQ架构模式、RabbitMQ使用、Quick.RabbitMQPlus的使用和RabbitMQ总结这几个方面对R......
  • 快速启动rabbitmq
    文档说明:只记录关键地方;试验环境:linuxdebian11rabbitmqversion:"3"services:rabbitmq-server:image:rabbitmq:3-management-alpineconta......
  • 02-rabbitMq
    RabbitMQ说明本章,我们主要从RabbitMQ简介、RabbitMQ安装、RabbitMQ常用命令、RabbitMQ架构模式、RabbitMQ使用、Quick.RabbitMQPlus的使用和RabbitMQ总结这几个方面对R......
  • RabbitMQ手动确认
    pom依赖<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>yml......
  • RabbitMQ.Client.Exceptions.BrokerUnreachableException:“None of the specified en
    RabbitMQ和程序运行在同一台电脑上可以使用guest账号访问如果不在同一台电脑,就会报错RabbitMQ.Client.Exceptions.BrokerUnreachableException:“Noneofthespecified......
  • centos8下安装RabbitMQ
    1、查看RabbitMQdockersearchrabbitmq2、拉取RabbitMQdockerpullrabbitmq3、创建、运行RabbitMQdockerrun-d--hostnameirabbitmq--namerabbi......
  • 【.NET 6】RabbitMQ延迟消息指南
    背景最近遇到一个比较特殊需求,需要修改一个的RabbitMQ消费者,以实现在消费某种特定的类型消息时,延迟1小时再处理,几个需要注意的点:延迟是以小时为单位不是所有消息都延迟......
  • docker安装rabbitmq(win10已安装Docker Desktop)
    打开rabbitmq官方网站:DownloadingandInstallingRabbitMQ—RabbitMQ如图所示: 在WindowsPowerShell中运行:dockerrun-it--rm--namerabbitmq-p5672:5672-p......