首页 > 其他分享 >ActiveMQ经典的使用模式(利用多线程处理消费端)

ActiveMQ经典的使用模式(利用多线程处理消费端)

时间:2022-11-14 15:59:21浏览次数:54  
标签:thread connection 线程 经典 import 多线程 ActiveMQ public pool

今天看视频,里面讲了一个经典的例子,是工作中很常用的,特此将这种模式记录下来.这个例子使用了ActiveMQ的选择器,也使用了

之前学的自定义线程池.队列的使用,而且很好的利用多线程并发的处理了任务,提高了吞吐量.

首先看生产端:

package com.jvm.activemq.bhz.mq;

import com.jvm.util.PropertiesUtil;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;

/**
*非常经典的例子,将任务分发给消费者,消费者使用多线程来处理,很好的提高了系统的吞吐量
*
* */
public class Producter {
//1.连接工厂
private ConnectionFactory connectionFactory;
//2.连接对象
private Connection connection;
//3.Session对象
private Session session;
//4.生产者对象
private MessageProducer messageProducer;

public Producter() {
try {
//初始化连接
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
PropertiesUtil.getValue("activemq.properties", "url")
);
this.connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建生产者
messageProducer = this.session.createProducer(null);

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void send() {
try {
//创建队列
Destination destination = session.createQueue("first");
//循环发送100个消息,
for (int i = 0; i < 100; i++) {
MapMessage message = session.createMapMessage();
message.setInt("id", i);
message.setString("name", "张" + i);
message.setString("age", "" + i);
//根据id不同,把消息分为2中,一种奇数,一种偶数,也就是说消费端会使用选择器,有选择的处理消息
String receiver = i % 2 == 0 ? "A" : "B";
//注意使用setStringProperty()方法,选择器过滤是根据这个方法过滤的
message.setStringProperty("receiver", receiver);
//发送消息(非持久化,优先级为2,即普通消息,存活时间为1分钟
this.messageProducer.send(destination,message,DeliveryMode.NON_PERSISTENT,2,1000*60L);
System.out.println("Message send "+ i);
}
this.closeConnection(connection);
} catch (JMSException e) {
e.printStackTrace();
}
}

//关闭连接
public void closeConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
Producter producter=new Producter();
producter.send();
}
}
然后创建2个消费端:

package com.jvm.activemq.bhz.mq;

import com.jvm.util.PropertiesUtil;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerA {
public static final String SELECTOR = "receiver='A'"; //特别注意此处的写法,必须加单引号,类似于sql语句
//1.连接工厂
private ConnectionFactory connectionFactory;
//2.连接对象
private Connection connection;
//3.Session对象
private Session session;
//4.生产者对象
private MessageConsumer messageConsumer;
//目的地
Destination destination;

public ConsumerA() {
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
PropertiesUtil.getValue("activemq.properties", "url")
);
this.connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.destination = session.createQueue("first");
messageConsumer = this.session.createConsumer(destination, SELECTOR);
System.out.println("ConsumerA ...start");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void receiver() {
try {
//利用消息监听器来实现接收消息,而不是while(true)
this.messageConsumer.setMessageListener(new Listener());
} catch (JMSException e) {
e.printStackTrace();
}
}
//此处很经典,很好的利用了多线程和队列的技术
class Listener implements MessageListener{
//创建一个有界阻塞队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
//创建一个自定义的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), //corePoolSize
20,//maximumPoolSize
120L, //keepAliveTime
TimeUnit.SECONDS, //单位
queue //使用的队列
);
@Override
public void onMessage(Message message) {

if (message instanceof MapMessage) {
//利用线程池开启多个线程去执行任务.相当于并行执行
MapMessage ret=(MapMessage) message;
//直接把任务交给多线程去处理
executor.execute(new MessageTask(ret));
}
if (message instanceof TextMessage) {
//处理流程
//....
}
}
}

public static void main(String[] args) {
ConsumerA consumerA=new ConsumerA();
consumerA.receiver();
}
}
消费者B

package com.jvm.activemq.bhz.mq;

import com.jvm.util.PropertiesUtil;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerB {
public static final String SELECTOR = "receiver='B'";//特别注意此处的写法
//1.连接工厂
private ConnectionFactory connectionFactory;
//2.连接对象
private Connection connection;
//3.Session对象
private Session session;
//4.生产者对象
private MessageConsumer messageConsumer;
//目的地
Destination destination;

public ConsumerB() {
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
PropertiesUtil.getValue("activemq.properties", "url")
);
this.connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.destination = session.createQueue("first");
messageConsumer = this.session.createConsumer(destination, SELECTOR);
System.out.println("ConsumerB ...start");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void receiver() {
try {
this.messageConsumer.setMessageListener(new Listener());
} catch (JMSException e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
//创建一个队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
//创建一个自定义的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), //corePoolSize
20,
120L,
TimeUnit.SECONDS,
queue
);
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
//利用线程池开启多个线程去执行任务.相当于并行执行
MapMessage ret=(MapMessage) message;
//直接把任务交给多线程去处理
executor.execute(new MessageTask(ret));
}
}
}

public static void main(String[] args) {
ConsumerB consumerA=new ConsumerB();
consumerA.receiver();
}
}
处理的任务:(使用Thread.sleep(500)来模拟处理的时间)

package com.jvm.activemq.bhz.mq;

import javax.jms.JMSException;
import javax.jms.MapMessage;
//消息任务,实现了Runable接口
public class MessageTask implements Runnable {

MapMessage mapMessage;
public MessageTask(MapMessage ret) {
this.mapMessage = ret;
}

@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("当前线程:"+Thread.currentThread().getName()+"处理任务"+this.mapMessage.getString("id"));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
程序的运行结果:消费者A接收到的是偶数(由于版面的原因,省略了部分结果),消费者B接收的是奇数

ConsumerA ...start
当前线程:pool-1-thread-2处理任务2
当前线程:pool-1-thread-1处理任务0
当前线程:pool-1-thread-3处理任务4
当前线程:pool-1-thread-8处理任务14
当前线程:pool-1-thread-5处理任务8
当前线程:pool-1-thread-4处理任务6
当前线程:pool-1-thread-7处理任务12
当前线程:pool-1-thread-9处理任务16
当前线程:pool-1-thread-6处理任务10
当前线程:pool-1-thread-11处理任务20
消费者B:

ConsumerB ...start
当前线程:pool-1-thread-1处理任务1
当前线程:pool-1-thread-2处理任务3
当前线程:pool-1-thread-6处理任务11
当前线程:pool-1-thread-5处理任务9
当前线程:pool-1-thread-3处理任务5
当前线程:pool-1-thread-4处理任务7
当前线程:pool-1-thread-11处理任务21

生产者:

Message send 0
Message send 1
Message send 2
Message send 3
Message send 4
Message send 5
Message send 6
Message send 7
Message send 8
Message send 9
结果: 因为每个任务处理的时间是500毫秒,而100个就是50秒,2个消费者单线程是25秒,而利用多线程技术大约只需要2秒就可以处理完.很好的提高了吞吐量.
————————————————
版权声明:本文为CSDN博主「mango奇」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_33804730/article/details/79846120

标签:thread,connection,线程,经典,import,多线程,ActiveMQ,public,pool
From: https://www.cnblogs.com/telwanggs/p/16889236.html

相关文章

  • 响应式编程(反应式编程)的来龙去脉(同步编程、多线程编程、异步编程再到响应式编程)
    响应式编程的来龙去脉(同步编程、多线程编程、异步编程再到响应式编程)文章目录​​响应式编程的来龙去脉(同步编程、多线程编程、异步编程再到响应式编程)​​​​简介​​​​......
  • activemq环境搭建
    1:下载activemq安装包 https://archive.apache.org/dist/activemq/5.16.3/apache-activemq-5.16.3-bin.zip2:上传activemq安装到服务器指定目录进行unzip apache-active......
  • Java多线程(一)
    一.线程的生命周期及五种基本状态关于Java中线程的生命周期,首先看一下下面这张较为经典的图:   上图中基本上囊括了Java中多线程各重要知识点。掌握了上图中的各知......
  • Java多线程简介
    一、线程简介Process进程与Thread线程程序是指令和数据的有序集合,本身没有任何运行的含义,为静态概念。进程是执行程序的一次执行过程,为动态概念。是系统资源分配的单位......
  • Java多线程中的ThreadLocal线程本地变量
    概论ThreadLocal指的是开辟一块统一初始化的空间,在这个区域块里每个线程使用的区域独立,互不干扰。一般创建为对象的静态属性。常用方法:set(value)get(value)子类的initia......
  • Java中指令重排在多线程中出现数据错误的例子
    概述听说当两条指令互相不依赖的时候,在cpu或者jvm那儿可能会为了提高性能而进行指令重排。数据依赖比如下面两条代码就没有数据依赖:int a=5;int b=2;这两条指......
  • 读者-写者(多线程)
     一、同步互斥问题-读者写者问题之写者优先(一)问题要求抽象解释:多个进程访问一个共享的数据区读者(读进程)只能读数据,写者(写进程)只能写数据适用于数据库、文件、内存......
  • 读者-写者(多线程)
    读者-写者(多线程)0推荐在openEuer上实现1描述操作系统中“读者-写者”问题,理解问题的本质,提交你理解或查找到的文本资料2利用多线程完成reader和writer3在main中测......
  • 创建多线程的方法四
    packagedaybyday;/*好处:1.提高相应速度(减少创建新线程的时间)2.降低资源消耗3.便于线程管理corePoolSize核心池的大小maximumPool最大线......
  • 实现多线程的方法三
    packagedaybyday;/*好处:call()可以有返回值call()可以抛出异常,被外面的操作捕获,获取异常的数值Callable是支持泛型的*/importjava.util.concurrent......