首页 > 其他分享 >RabbitMQ-工作模式

RabbitMQ-工作模式

时间:2022-10-04 19:44:42浏览次数:57  
标签:String factory 模式 工作 RabbitMQ import pickle final channel

1.Hello_wordl!

代码实现

consumer

package cn.pickle.consumer; 

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;  

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/3 16:33
 */
public class Consumer_Hello {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        Consumer comsurmer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后自动执行该方法。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                System.out.println("Exchange: " + envelope.getExchange());
                System.out.println("RoutingKey: " + envelope.getRoutingKey());
                System.out.println("Properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume("hello_world",true,comsurmer);
    }
}

producer

package cn.pickle.producer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @describe 发送消息
 * @author Pickle
 * @version V1.0
 * @date 2022/10/3 15:08
 */

public class Producer_Hello {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数,
        factory.setHost("localhost");       //IP地址
        factory.setPort(5672);              //端口,默认值5672
        factory.setVirtualHost("/");        //设置虚拟机,默认值"/"
        factory.setUsername("pickle");
        factory.setPassword("pickle");
        //创建连接
        final Connection connection = factory.newConnection();
        //创建channel
        final Channel channel = connection.createChannel();
        //创建消息队列
        /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
         */
        channel.queueDeclare("hello_world", true, false, false, null);
        //发送消息
        String body = "hello rabbitmq~~~~";
        channel.basicPublish("","hello_world",null,body.getBytes());

        //释放资源
        channel.close();
        connection.close();
    }
}

2、工作队列

producer

package cn.pickle.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/3 17:36
 */
public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //创建消息队列
        channel.queueDeclare("work_queues",true,false,false,null);
        for (int i = 1; i <= 10; i++) {
            String body = i + "hello rabbitmq~~~";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }
    }
}

consumer1

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/3 17:47
 */
public class Consumer_WorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        Consumer comsurmer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后自动执行该方法。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: " + consumerTag);
//                System.out.println("Exchange: " + envelope.getExchange());
//                System.out.println("RoutingKey: " + envelope.getRoutingKey());
//                System.out.println("Properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume("work_queues",true,comsurmer);
    }
}

consumer2

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/3 17:47
 */
public class Consumer_WorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        Consumer comsurmer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后自动执行该方法。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: " + consumerTag);
//                System.out.println("Exchange: " + envelope.getExchange());
//                System.out.println("RoutingKey: " + envelope.getRoutingKey());
//                System.out.println("Properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume("work_queues",true,comsurmer);
    }
}

先启动两个消费者,再启动生产者.效果如下

总结

  • 消费者对于同一个消息是竞争关系
  • 消费者轮流的从消息队列中读取消息

3、Pub/Sub工作模式

引入了交换机

Producer_PubSub

package cn.pickle.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 16:05
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("pickle");
        factory.setPassword("pickle");
        factory.setVirtualHost("/");

        final Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        //创建交换机
//        String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
//        exchange:交换机名称
//        type:
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

        //创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //绑定队列和交换机
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        //发送消息
        String body = "日志信息:张三调用了fanout方法.... 日志级别:info...";
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        channel.close();
        connection.close();
    }
}

Consumer_PubSub1

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 16:25
 */
public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        factory.setUsername("pickle");
        factory.setPassword("pickle");

        final Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息打印到控制台....");
            }
        };

        channel.basicConsume(queue1Name,true,consumer);
    }
}

Consumer_PubSub2

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 16:25
 */
public class Consumer_PubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        factory.setUsername("pickle");
        factory.setPassword("pickle");

        final Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息保存数据库....");
            }
        };

        channel.basicConsume(queue2Name,true,consumer);
    }
}

结果展示

4、路由工作模式

Producer_Routing

package cn.pickle.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 16:45
 */
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("pickle");
        factory.setPassword("pickle");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String exchangeName="test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        channel.queueBind(queue1Name,exchangeName,"error");

        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String body = "日志信息:张三调用了routing方法... 日志等级:info...";

        channel.basicPublish(exchangeName,"warning",null,body.getBytes());

        channel.close();
        connection.close();
    }
}

Consumer_Routing1

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 17:26
 */
public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setVirtualHost("/");
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息打印到控制台....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);
    }
}

Consumer_Routing2

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 17:26
 */
public class Consumer_Routing2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setVirtualHost("/");
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息保存到数据库....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
    }
}

5、Topic模式

Producer_Topic

package cn.pickle.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 19:09
 */
public class Producer_Topic {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("pickle");
        factory.setPassword("pickle");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String exchangeName="test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);



        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");

        channel.queueBind(queue2Name,exchangeName,"*.*");

        String body = "我TM是一条消息...";
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

    }
}

Consumer_Topic1

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 19:26
 */
public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setVirtualHost("/");
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息保存到数据库....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
    }
}

Consumer_Topic2

package cn.pickle.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Pickle
 * @version V1.0
 * @date 2022/10/4 19:26
 */
public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setPassword("pickle");
        factory.setUsername("pickle");
        factory.setVirtualHost("/");
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息打印到控制台....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);

    }
}

标签:String,factory,模式,工作,RabbitMQ,import,pickle,final,channel
From: https://www.cnblogs.com/poteitoutou/p/16750846.html

相关文章

  • 从vue源码中学习观察者模式
    摘要:源码解读设计模式系列文章将陆陆续续进行更新中~摘要:源码解读设计模式系列文章将陆陆续续进行更新中~观察者模式首先话题下来,我们得反问一下自己,什么是观察者模式?......
  • 设计模式:访问者模式
    访问者模式诞生的思维过程访问者模式难理解、难实现,应用它会导致代码的可读性、可维护性变差,所以,访问者模式在实际的软件开发中很少被用到,在没有特别必要的情况下,建议你不......
  • 从vue源码中学习观察者模式
    摘要:源码解读设计模式系列文章将陆陆续续进行更新中~摘要:源码解读设计模式系列文章将陆陆续续进行更新中~观察者模式首先话题下来,我们得反问一下自己,什么是观察者模式......
  • 设计模式-单例模式
    单例模式的英文叫做singleton模式,我先说一下,单例模式是怎么回事,就是,在你的系统里,你要判断一下,如果有一些类,只需要一个实例就可以了,那就给那个类,做成单例的模式。实际上我......
  • Canal + RabbitMQ 实现监听 MySQL 数据库
    第一步:开启Mysql Biglog日志,Mysql8.0以上默认开启日志(window路径:C:\ProgramData\MySQL\MySQLServer8.0\mysql.ini)1.添加配置[mysqld]log-bin=mysql-bin#开启bi......
  • 不同网络连接模式下kali虚拟机的IP
    文章内容简单介绍:分别观察kali虚拟机在桥接模式和NAT模式下的IP地址的变化一、桥接模式物理主机的IP地址通过观察后发现为 192.168.43.103 kali......
  • rabbitmq安装
    rabbitmq简介AMQP,即advancemessagequeuingprotocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中......
  • 并发学习记录15:工作线程
    定义就是让有限的工作线程来轮流异步处理无限多的任务。典型实现就是线程池,线程个数是有限的,但是任务是源源不断需要被处理的假设一个饭店有服务员线程,需要轮流处理客户......
  • 桥接模式【Java设计模式】
    桥接模式【Java设计模式】​​前言​​​​推荐​​​​桥接模式​​​​介绍​​​​实现​​​​最后​​前言2022/9/2313:34路漫漫其修远兮,吾将上下而求索本文是根据袁......
  • 笔记一:机器学习工作流程
    目录1理解问题和背景1.1目的1.2工作环境1.3获取数据2探索性数据分析(EDA)3数据预处理3.1数据清理3.2特征选择3.3特征工程3.4特征缩放4模型探索根据Geron(2019)......