首页 > 编程语言 >开源消息中间件ActiveMQ回顾:Java客户端实现

开源消息中间件ActiveMQ回顾:Java客户端实现

时间:2023-09-14 15:03:41浏览次数:47  
标签:Java String void private 消息中间件 AMQ LOGGER ActiveMQ public


前一段时间工作中经常使用到Apache ActiveMQ用作消息传输。今天在公司不是很忙,于是又深入研究了一下,总结一下分享出来。

基于ActiveMQ的Java客户端实现例子。

接口定义:

public interface MQService {
    public void start();
    public void sendQueueMessage(String text) throws JMSException;
    public void publishTopicMessage(String text) throws JMSException;
    public void destroy();
}

实现类(部分代码折行了,大家将就着看吧):

public class ActiveMQServiceImpl implements MQService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQServiceImpl.class);
    private static MQService instance = null;
    
    private InitialContext initialContext;
    
    private QueueConnectionFactory queueConnectionFactory;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private Queue queue;
    private QueueSender queueSender;
    private QueueReceiver queueReceiver;
    
    private TopicConnectionFactory topicConnectionFactory;
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private Topic topic;
    private TopicPublisher topicPublisher;
    private TopicSubscriber topicSubscriber;
    
    protected ActiveMQServiceImpl() {
        try {
        	initialContext = new InitialContext(getInitalContextTable());
            initQueue();
            initTopic();
            LOGGER.info("AMQ init complete!");
        } catch (Exception e) {
            LOGGER.error("failed to connect mq:",e);
        }
    }
    
    private static Hashtable<String,String> getInitalContextTable(){
    	Hashtable<String,String> table=new Hashtable<String,String>();
    	table.put("java.naming.factory.initial", (String)ConfigProject.AMQ_CONFIG.getString("java.naming.factory.initial"));
    	table.put("java.naming.provider.url", (String)ConfigProject.AMQ_CONFIG.getString("java.naming.provider.url"));
    	return table;
    }
    
    public static MQService getInstance(){
    	if(instance==null){
    		instance = new ActiveMQServiceImpl();
    		return instance;
    	}
    	return instance;
    }
    
    protected void initQueue() throws Exception{
    	LOGGER.info("initQueue begin...");
    	//获取queueConnectionFactory,通过javax.jms.ConnectionFactory.
    	queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");
    	
    	//创建queueConnection
    	if (ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal")!=null) {
            queueConnection = queueConnectionFactory.createQueueConnection(
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal"),
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.credentials")
            		);
        } else {
            queueConnection = queueConnectionFactory.createQueueConnection();
        }
        
        //设置queueConnection的clientId持久化消息
        String clientID = "queue@";
        try {
            clientID += InetAddress.getLocalHost();
        } catch (Exception e) {
            clientID += UUID.randomUUID().toString();
        }
        queueConnection.setClientID(clientID);
        
        //创建queueSession
        queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        
        //创建queue
        queue = queueSession.createQueue((String)ConfigProject.AMQ_CONFIG.getProperty("test.notification.queue"));
        
        //创建queueSender发送消息
        queueSender = queueSession.createSender(queue);
        //设置消息是否持久化:如果消费端没有接收到消息,设置持久化后即使AMQ服务器重启,消费端启动后也会收到消息.
        //如果设置非持久化,消息只保存在AMQ服务器内存中,一旦服务器重启,消费端将收不到消息.
        queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        //创建queueReceiver接收消息
        queueReceiver = queueSession.createReceiver(queue);
        queueReceiver.setMessageListener(new QueueReceiveListener());
        LOGGER.info("initQueue end...");
    }
    
    protected void initTopic() throws Exception{
    	LOGGER.info("initTopic begin...");
    	//获取topicConnectionFactory
        topicConnectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
        //创建topicConnection
        if (ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal") != null) {
            topicConnection = topicConnectionFactory.createTopicConnection(
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal"),
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.credentials"));
        } else {
            topicConnection = topicConnectionFactory.createTopicConnection();
        }
        
        //设置clientId
        String clientID = "topic@";
        try {
            clientID += InetAddress.getLocalHost();
        } catch (Exception e) {
            clientID += UUID.randomUUID().toString();
        }
        topicConnection.setClientID(clientID);
        
        //创建topicSession
        topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建topic
        topic = topicSession.createTopic((String)ConfigProject.AMQ_CONFIG.getProperty("test.notification.topic"));
        
        //创建发布者
        topicPublisher = topicSession.createPublisher(topic);
        topicPublisher.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        //创建消费者
        topicSubscriber  = topicSession.createSubscriber(topic);
        topicSubscriber.setMessageListener(new TopicReciverListener());
        LOGGER.info("initTopic end...");
    }
    

    public void start() {
        if(queueConnection!=null)
        {
            try {
                queueConnection.start();
            } catch (JMSException e) {
            	LOGGER.error("failed to start queue connection!",e);
            }
        }
        if(topicConnection!=null)
        {
            try {
                topicConnection.start();
            } catch (JMSException e) {
            	LOGGER.error("failed to start topic connection!",e);
            }
        }
    }

    public void sendQueueMessage(String text) throws JMSException {
        queueSender.send(queueSession.createTextMessage(text));
    }
    
    public void publishTopicMessage(String text) throws JMSException {
    	topicPublisher.publish(topicSession.createTextMessage(text));
    }

    public void destroy() {
        try {
        	this.queueSender.close();
        	this.queueReceiver.close();
            this.queueSession.close();
            this.queueConnection.close();
            
            this.topicPublisher.close();
            this.topicSubscriber.close();
            this.topicSession.close();
            this.topicConnection.close();
            
            this.initialContext.close();
        } catch (NamingException e) {
            LOGGER.warn("failed to shutdown mq", e);
        } catch (Exception e) {
            LOGGER.warn("failed to shutdown mq", e);
        }
    }
}

上面程序中:

ConfigProject是个配置工具类,读取properties中的AMQ配置信息。

JNDI初始化InitialContext时一定要注意加载amq的InitialContextFactory和请求url地址。

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

java.naming.provider.url=failover://(nio://1x.xx.xx.x3:61616,nio://1x.xx.xx.x4:61616)randomize=false

日志用的是logback+slf4j这2个开源组件。

QueueReceiveListener是队列消费端的监听器,TopicReciverListener是主题订阅者的监听器。

QueueReceiveListener.java:

/**
 * 消息监听类:
 * 如果需要回复,则可以作为内部类实现.
 */
public class QueueReceiveListener implements MessageListener{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveListener.class);
	
	public void onMessage(Message message) {
		LOGGER.info("QueueReceiveListener onMessage begin...");
		try{
			if(message instanceof TextMessage){
				TextMessage textMessage = (TextMessage) message;
				LOGGER.info("textMessage:"+textMessage.getText());
			}
		}catch(Exception e){
			e.printStackTrace();
		}
		LOGGER.info("QueueReceiveListener onMessage end...");
	}
	
}

 TopicReciverListener.java:

public class TopicReciverListener implements MessageListener{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(TopicReciverListener.class);
	
	public void onMessage(Message message) {
		LOGGER.info("TopicReciverListener onMessage begin...");
		try{
			if(message instanceof TextMessage){
				TextMessage textMessage = (TextMessage) message;
				LOGGER.info("textMessage:"+textMessage.getText());
			}
		}catch(Exception e){
			e.printStackTrace();
		}
		LOGGER.info("TopicReciverListener onMessage end...");
	}

}

同样,这两个监听类也可以通过内部类的形式写在实现类内部,例如需要回复收到的消息时。此处因为没有回复消息的功能,所以就单独写了一个类。

最后,定义一个工具类方便调用:

public class MQServiceUtils {
        //此处同样可以使用IoC框架注入
	private static MQService mqService = ActiveMQServiceImpl.getInstance();
	
	public static void sendMessage(String message){
		try{
			mqService.sendQueueMessage(message);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	public static void publishMessage(String message){
		try{
			mqService.publishTopicMessage(message);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

 

 

附一些不错的参考资料:

http://www.iteye.com/topic/836509 基于ActiveMQ 的发布/订阅实例


http://wenku.baidu.com/view/a41c0a29bd64783e09122bbb.htmls  p2p/topic开发流程.


http://wenku.baidu.com/view/6c763565f5335a8102d22059.html  AMQ PPT文档.


http://wenku.baidu.com/view/4e63b6dbad51f01dc281f10d.html  AMQ in Action中文版


http://www.huaishao8.com/config/activemq/143.html AMQ使用详解(连载)


http://wenku.baidu.com/view/a41c0a29bd64783e09122bbb.htmls  ActiveMQ教程


http://jinguo.iteye.com/blog/233188 

标签:Java,String,void,private,消息中间件,AMQ,LOGGER,ActiveMQ,public
From: https://blog.51cto.com/u_6978506/7470426

相关文章

  • 不再困惑!Java中for循环的全面解析
    Java中的for循环是一种常用的循环结构,用于重复执行一段代码。它的基本语法如下:for(初始化语句;条件表达式;更新语句){//循环体代码}其中,初始化语句用于初始化循环控制变量;条件表达式用于判断是否继续循环;更新语句用于更新循环控制变量的值。具体来说,for循环的执行过程如下......
  • 分布式ActiveMQ集群
    回顾总结前一段时间学习的ActiveMQ分布式集群相关的知识,分享出来希望对看到的人有所帮助。一、分布式ActiveMQ集群的部署配置细节:官方资料:http://activemq.apache.org/clustering.html基本上看这个就足够了,本文就不具体分析配置文件了。1、Queueconsumerclusters:同一个queue,如果......
  • Java中ProcessBuilder使用
    可以使用java中的ProcessBuilder执行本地命令或脚本等工作:以下是一个简单的使用java调用本地python脚本的例子。从某工程代码中整理出来的,未封装,仅供参考。List<String>commands=newArrayList();commands.add("python");commands.add(pkg);commands.add("--ad=test");//...其......
  • 获取JavaApplication当前工程路径
    前日因工作中使用到日志和配置工具类,使相关信息输出文件中,因此总结了一下java中获取当前路径的方法(非web工程)。1、File类:Filefile=newFile(".");System.out.println(file.getCanonicalPath());//如果是..则返回上一级文件夹System.out.println(file.getAbsolut......
  • Java反序列化漏洞实现
    Java反序列化漏洞实现一、说明以前去面试被问反序列化的原理只是笼统地答在参数中注入一些代码当其反序列化时被执行,其实“一些代码”是什么代码“反序列化”时为什么就会被执行并不懂;反来在运营商做乙方经常会因为java反反序列化漏洞要升级commons.collections或给中间件打补丁......
  • Java图片剪裁功能实现
    目前一些社交型互联网应用都有一些上传图片(例如头像,照片等)对预览图进行剪裁的功能。前一段时间在工作也遇到这个问题,总结一下基本实现步骤及代码(包含图片放大,缩小,设置品质,对指定点区域剪裁功能),使用JPEG格式图片测试通过,其它格式图片尚未验证。一、基本步骤:1.将图片文件的InputS......
  • JMS规范与ActiveMQ简记
    前一段时间公司的产品中使用了ActiveMQ作为消息通知的工具,也简要记录了一些概念,整理后与大家分享一下(部分内容摘自网络,详见参考资料一栏)。 一、ActiveMQ是一个JMS规范的一个实现。在JMS中间主要定义了2种消息模式Point-to-Point(点对点)和Publich/SubscribeModel(发......
  • Java动态代理详解
    不定期整理硬盘内源代码、笔记、总结等,同时发上来分享一下。今天再发一篇关于Java动态代理的总结(貌似ItEye一天最多发5篇Blog,再多只能放草稿箱了?)-----------------------------------------------------------Java动态代理详解说到动态代理,顾名思义就是动态的代理(真是废话)。关......
  • 设计模式回顾之一:单例模式(Java的4种实现)
    设计模式回顾系列文章:主要针对工作中常用常见的设计模式进行整理、总结,同时分享以供大家拍砖。------------------------------------------------作为一个程序员,我并不知道"茴"字有4种写法。但是,我知道单例有4种写法。单例模式目的:保证一个类仅有一个实例,并提供一个访问它的全局访......
  • 无涯教程-JavaScript - ISREF函数
    描述如果指定的值是参考,则ISREF函数返回逻辑值TRUE。否则返回FALSE。语法ISREF(value)争论Argument描述Required/OptionalvalueAreferencetoacell.RequiredNotes您可以在执行任何操作之前使用此功能测试单元格的内容。适用性Excel2007,Excel2010,Excel......