应用场景:异步处理、应用解耦、流量消峰
简介:Apache出品,是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现
JMS消息模型:
1) P2P 点对点模型(Queue队列模型)
特点:每个消息只有一个消费者(消息一旦被消费,消息就不再存在消息队列中)
生产者和消费者之间在时间上没有依赖性,不需要同时处于运行状态
消费者在成功消费之后,需要向队列应答成功
2) Publish/Subscribe(Pub/Sub) 发布/订阅模式(Topic主题模型)
特点:每个消息可以有多个消费者
发布者和订阅者有时间依赖性(先订阅主题,再发送消息)
订阅者必须保持运行状态,才能接收到消息
JMS编程API:
1)ConnectionFactory
创建Connection对象的工厂,针对两种模型分别有QueueConnection和TopicConnectionFactory
2)Destination
对消费者来说是消息来源,某个队列(Queue)或某个主题(Topic),对生产者来说是某个队列(Queue)或某个主题(Topic)
3)Connection
表示客户端与JMS系统之间建立的链接(对TCP/IP socket的包装) ,Connection可以产生一个或多个Session
4) Session
是对消息进行操作的接口,可以创建生产者、消费者、消息等。也提供了事务功能,如果需要可以将发送/接收消息的动作放到一个事务中
5)Producter
生产者:由Session创建,并用于将消息发送到Destination对象中。分两种类型 QueueSender和TopicPublisher,可调用生产者的send或publish方法发送消息
6)Consumer
消费者:由Session创建,用于接收Destination中的消息。分两种类型 QueueReceiver和TopicSubscriber,分别通过createReceiver(Queue)或createSubScriber(Topic)来创建。 也可以用createDurableSubscriber来创建持久化的订阅者。
7)MessageListener
消息监听器:如果注册了消息监听器,一旦消息到达,自动调用onMessage方法。EJB中的MDB(Message Driven Bean)就是一种MessageLinstener
原生JMS-点对点
Producer:
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.112.129:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Queue queue = session.createQueue("queue01");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.发送消息
TextMessage test_message = session.createTextMessage("test message");
producer.send(test_message);
//8.释放资源
session.close();
connection.close();
}
Consumer:
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.112.129:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.指定目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Queue queue01 = session.createQueue("queue01");
//6.创建消息消费者
MessageConsumer consumer = session.createConsumer(queue01);
consumer.setMessageListener(message -> {
if(message instanceof TextMessage){
TextMessage txt = (TextMessage)message;
try {
System.out.println("接收的消息(2): " + txt.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
原生JMS-发布订阅
Producer:
//5.创建目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Topic topic = session.createTopic("topic01");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
Consumer:
//5.指定目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Topic topic = session.createTopic("topic01");
//6.创建消息消费者
MessageConsumer consumer = session.createConsumer(topic01);
springboot整合ActiveMQ
导入坐标
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Producer:
配置类application.yml
server:
port: 9001
spring:
application:
name: activemq-producer
# springboot 与 activemq的整合配置
# 连接工厂
activemq:
broker-url: tcp://192.168.112.129:61616
user: admin
password: admin
# 指定发送模式
jms:
pub-sub-domain: false
测试类 SpringbootProducer
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringbootProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void ptpSender(){
jmsMessagingTemplate.convertAndSend("springboot_queue", "springboot test -- queue");
}
}
Consumer:
依赖同Producer,配置同Producer
编写Listener
@Component
public class MsgListener {
@JmsListener(destination = "springboot_queue")
public void receiveMsg(Message message) throws JMSException {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
System.out.println("接收的消息: " + txtMsg.getText());
}
}
}
标签:Queue,Session,创建,Topic,session,消息,使用,ActiveMQ From: https://www.cnblogs.com/Fei-Gao/p/16827933.html