首页 > 其他分享 >【项目实战典型案例】16.消息队列作用和意义

【项目实战典型案例】16.消息队列作用和意义

时间:2023-03-20 10:39:01浏览次数:38  
标签:实战 16 队列 消息 println new public out


目录

  • ​​一:背景介绍​​
  • ​​二:消息队列​​
  • ​​消息队列简介​​
  • ​​解耦​​
  • ​​异步​​
  • ​​流量削峰​​
  • ​​原理​​
  • ​​1.ArrayBockingQueue:​​
  • ​​2.Socket​​
  • ​​3.SeverSocket​​
  • ​​4.Java IO操作——BufferedReader​​
  • ​​5.java.io.PrintWriter​​
  • ​​三:实现过程​​
  • ​​解耦和异步​​
  • ​​流量削峰​​
  • ​​四:总结​​

一:背景介绍

【项目实战典型案例】16.消息队列作用和意义_System

二:消息队列

消息队列简介

  • MQ全程为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。
  • 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削峰等问题。

解耦

譬如签到送积分,签到和送积分是两个操作。签到产生了很重要的数据,它可以把消息发送到MQ,然后积分系统需要该数据,从MQ中直接获取即可。这样签到系统就做到了和积分系统解耦,不必担心积分系统挂了怎么办,是不是需要重试等,而这些都可以在积分系统内部自己实现,再者,如果以后另外一套系统也需要该签到数据,直接从MQ中获取即可,实际上与签单系统已无关系。

异步

当做到解耦后,实现异步就是自然而然的事情,如果签到只需要1ms,而送积分,或者其他操作需要500ms,那不可能等所有操作完成之后再去返回数据给用户,这样就做到了异步。

流量削峰

削峰是指当并发访问高峰期,通过MQ达到限流的目的,从而减少对数据库MySQL的压力

原理

1.ArrayBockingQueue:

ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)

ArrayBockingQueue使用场景:

1.先进先出队列:头是先进队的元素,尾是后进队的元素

2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作

3.队列不支持空元素

阻塞式队列方法的四种形式:

【项目实战典型案例】16.消息队列作用和意义_System_02

2.Socket

创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象

3.SeverSocket

ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。

4.Java IO操作——BufferedReader

可以接收任意长度的数据,并且避免乱码的产生

5.java.io.PrintWriter

输出流、字符打印流

三:实现过程

解耦和异步

消息处理中心 Broker

public class Broker {
private final static int MAX_SIZE = 3;
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);

public static void produce(String msg){
if(messageQueue.offer(msg)){
System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());
} else{
System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
}
System.out.println("-----------------------------");
}

public static String consume(){
String msg = messageQueue.poll();
if(msg != null){
System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());
} else {
System.out.println("消息处理中心内没有可供消费的消息!");
}
System.out.println("-----------------------------");
return msg;
}
}

BrokerSever用来提供Broker类得对外服务,BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动

public class BrokerSever implements Runnable{
public static int SERVICE_PORT = 9999;
private final Socket socket;

public BrokerSever(Socket socket){
this.socket = socket;
}

@Override
public void run() {
try(
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream()))
{
while (true){
String str = in.readLine();
if (str == null){
continue;
}
System.out.println("接收到原始数据: " + str);
if (str.equals("CONSUME")){
String message = Broker.consume();
out.println(message);
out.flush();
}else {
Broker.produce(str);
}
}
} catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
ServerSocket server = new ServerSocket(SERVICE_PORT);
while(true){
BrokerSever brokerServer = new BrokerSever(server.accept());
new Thread(brokerServer).start();
}
}
}

ProduceClient消息生产者

public class ProduceClient {
public static void main(String[] args) throws Exception{
MyClient client = new MyClient();
client.produce("hello World.");
}
}

ConsumeClient消息消费者

public class ConsumeClient {
public static void main(String[] args) throws Exception{
MyClient client = new MyClient();

String message = client.consume();
System.out.println("获得的消息为: " + message);
}
}

MyClient与消息服务器进行通信

public class MyClient {
public static void produce(String message) throws Exception{
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
try(
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println(message);
out.flush();
}
}
public static String consume() throws Exception{
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
try(
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println("CONSUME");
out.flush();
String message = in.readLine();
return message;
}
}
}

流量削峰

定义一个消息生产者

@Test
public void test() throws Exception {
for (int i = 0; i < 1000 ; i++) {
rabbitTemplate.convertAndSend("test-queue ", "消息发送);
}
Thread.sleep(1000 * 1000);
}

使用@RabbitListener注解定义一个消息消费者

@Component
@RabbitListener(queuesToDeclare = @Queue(name = "test-queue"))
public class Consumer {
private int count = 0;
@RabbitHandler
public void receive(String msg, Channel channel, Message message) throws IOException {

long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
Thread.sleep(1000);
System.out.println("=====消息处理===>");
channel.basicAck(deliveryTag, true);
System.out.println("current count is:" + ++count);
} catch (Exception e) {

}
}
}

运行效果

【项目实战典型案例】16.消息队列作用和意义_System_03


Ready:待消费的消息总数

Unacked:待应答的消息总数。

Total:总数 Ready+Unacked

四:总结

大家可以参考一下这篇博客:​​消息队列的作用和整体介绍​​


标签:实战,16,队列,消息,println,new,public,out
From: https://blog.51cto.com/u_15903651/6132063

相关文章