首页 > 其他分享 >ActiveMQ服务搭建

ActiveMQ服务搭建

时间:2023-02-19 16:46:17浏览次数:64  
标签:Received 服务 thread connection world message include ActiveMQ 搭建

ActiveMQ服务搭建

启动activeMQ服务器

1首先要下载bin压缩文件 https://activemq.apache.org/
Components -> ActiveMQ "Classic" -> 下载 apache-activemq-5.9.0-bin.tar.gz (http://activemq.apache.org/activemq-590-release.html)
或者 wget http://activemq.apache.org/activemq-590-release.html
tar -zxvf apache-activemq-5.9.0-bin.tar.gz
./activemq start(我在这个地方执行失败了)

INFO: Loading '/etc/default/activemq'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
./activemq: 396: ./activemq: /usr/bin/java -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.security.auth.login.config=/home/bell/apache-activemq-5.9.0/conf/login.config  -Dcom.sun.management.jmxremote  -Djava.awt.headless=true -Djava.io.tmpdir="/home/bell/apache-activemq-5.9.0/tmp"                -Dactivemq.classpath="/home/bell/apache-activemq-5.9.0/conf;"               -Dactivemq.home="/home/bell/apache-activemq-5.9.0"               -Dactivemq.base="/home/bell/apache-activemq-5.9.0"               -Dactivemq.conf="/home/bell/apache-activemq-5.9.0/conf"               -Dactivemq.data="/home/bell/apache-activemq-5.9.0/data"                              -jar "/home/bell/apache-activemq-5.9.0/bin/activemq.jar" start >/dev/null 2>&1 &
              RET="$?"; APID="$!";
              echo $APID > /home/bell/apache-activemq-5.9.0/data/activemq-ubuntu.pid;
              echo "INFO: pidfile created : '/home/bell/apache-activemq-5.9.0/data/activemq-ubuntu.pid' (pid '$APID')";exit $RET: not found

执行 uname -a

Linux ubuntu 4.15.0-142-generic #146~16.04.1-Ubuntu SMP Tue Apr 13 09:27:15 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

接着进入 /bin/linux-x86-64 里面有 activemq
执行 ./activemq start

./activemq start
Starting ActiveMQ Broker...

如果要停止 ./activemq stop

Stopping ActiveMQ Broker...
Waiting for ActiveMQ Broker to exit...
Stopped ActiveMQ Broker.

ps -elf | grep active 查看后台运行状态
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 访问链接形式:http://127.0.0.1:8161/admin/
用户名:admin 密码: admin

安装activeMQ CPP库

从官网下载 CPP源码 Components -> CMS Client
下载解压
查看README.txt
sudo apt-get install autoconf
sudo apt-get install automake
sudo apt-get install libtool
下载APR库 https://apr.apache.org/download.cgi
下载:apr-1.7.2.tar.gz
解压:tar -zxvf apr-1.7.2.tar.gz
./configure; make; make test; make install
默认安装在 /usr/local/apr里
安装openssl(需要用到ssl传输功能) 这里我已经装过了,小伙伴们可以自己查查相关文档,安装很简单 不要这个功能 --disable-ssl
安装 activeMQ-CPP
sudo apt-get install build-essential (ubuntu)
进入到解压目录里
./configure; make; make install; make test(可以省略)

在Qt中新建项目测试

在这里需要加载lib(*.so)目录所在位置 include目录所在位置
我的是 /usr/include /usr/local/apr /usr/local/activemq
(QT .pro文件中)

win32:CONFIG(release, debug|release): LIBS += -L$$PWD/../../../../usr/local/lib/release/ -lactivemq-cpp
else:win32:CONFIG(debug, debug|release): LIBS += -L$$PWD/../../../../usr/local/lib/debug/ -lactivemq-cpp
else:unix: LIBS += -L$$PWD/../../../../usr/local/lib/ -lactivemq-cpp

INCLUDEPATH += $$PWD/../../../../usr/include
DEPENDPATH += $$PWD/../../../../usr/include

win32:CONFIG(release, debug|release): LIBS += -L$$PWD/../../../../usr/local/apr/lib/release/ -lapr-1
else:win32:CONFIG(debug, debug|release): LIBS += -L$$PWD/../../../../usr/local/apr/lib/debug/ -lapr-1
else:unix: LIBS += -L$$PWD/../../../../usr/local/apr/lib/ -lapr-1

INCLUDEPATH += $$PWD/../../../../usr/include
DEPENDPATH += $$PWD/../../../../usr/include

我在编译的时候遇到报错 找不到头文件 和库文件 链接出现问题 主要是忘了包含库的头文件和动态库

新建C++测试

新建一个consumer.cpp 和 producer.cpp
编译
g++ consumer.cpp -o consumer -I /usr/include/activemq/ -L /usr/local/lib/ -lactivemq-cpp
g++ producer.cpp -o send -I /usr/include/activemq/ -L /usr/local/lib/ -lactivemq-cpp
编译的时候出现过的问题

/usr/bin/ld: 找不到 -lactivemq-cpp1 链接名字有问题
./send: error while loading shared libraries: libactivemq-cpp.so.19: cannot open shared object file: No such file or directory
把libactivemq-cpp.so文件找到 在/usr/local/lib里
sudo vim /etc/ld.so.conf 在里面把这个路径加上
再用命令刷新:sudo /sbin/ldconfig -v

代码

#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

using namespace activemq::core;
using namespace decaf::util::concurrent;
using namespace decaf::util;
using namespace decaf::lang;
using namespace cms;
using namespace std;

class HelloWorldProducer : public Runnable {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    int numMessages;
    bool useTopic;
    bool sessionTransacted;
    std::string brokerURI;

private:

    HelloWorldProducer(const HelloWorldProducer&);
    HelloWorldProducer& operator=(const HelloWorldProducer&);

public:

    HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        numMessages(numMessages),
        useTopic(useTopic),
        sessionTransacted(sessionTransacted),
        brokerURI(brokerURI) {
    }

    virtual ~HelloWorldProducer(){
        cleanup();
    }

    void close() {
        this->cleanup();
    }

    virtual void run() {

        try {

            // Create a ConnectionFactory
            auto_ptr<ConnectionFactory> connectionFactory(
                ConnectionFactory::createCMSConnectionFactory(brokerURI));

            // Create a Connection
            connection = connectionFactory->createConnection();
            connection->start();

            // Create a Session
            if (this->sessionTransacted) {
                session = connection->createSession(Session::SESSION_TRANSACTED);
            } else {
                session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
            }

            // Create the destination (Topic or Queue)
            destination = session->createQueue("TEST.FOO");

            // Create a MessageProducer from the Session to the Topic or Queue
            producer = session->createProducer(destination);
            producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

            // Create the Thread Id String
            string threadIdStr = Long::toString(Thread::currentThread()->getId());

            // Create a messages
            string text = (string) "Hello world! from thread " + threadIdStr;

            for (int ix = 0; ix < numMessages; ++ix) {
                std::auto_ptr<TextMessage> message(session->createTextMessage(text));
                message->setIntProperty("Integer", ix);
                printf("Sent message #%d from thread %s
", ix + 1, threadIdStr.c_str());
                producer->send(message.get());
            }

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

private:

    void cleanup() {

        if (connection != NULL) {
            try {
                connection->close();
            } catch (cms::CMSException& ex) {
                ex.printStackTrace();
            }
        }

        // Destroy resources.
        try {
            delete destination;
            destination = NULL;
            delete producer;
            producer = NULL;
            delete session;
            session = NULL;
            delete connection;
            connection = NULL;
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
};

class HelloWorldConsumer : public ExceptionListener,
                           public MessageListener,
                           public Runnable {

private:

    CountDownLatch latch;
    CountDownLatch doneLatch;
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    long waitMillis;
    bool useTopic;
    bool sessionTransacted;
    std::string brokerURI;

private:

    HelloWorldConsumer(const HelloWorldConsumer&);
    HelloWorldConsumer& operator=(const HelloWorldConsumer&);

public:

    HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :
        latch(1),
        doneLatch(numMessages),
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        waitMillis(waitMillis),
        useTopic(useTopic),
        sessionTransacted(sessionTransacted),
        brokerURI(brokerURI) {
    }

    virtual ~HelloWorldConsumer() {
        cleanup();
    }

    void close() {
        this->cleanup();
    }

    void waitUntilReady() {
        latch.await();
    }

    virtual void run() {

        try {

            // Create a ConnectionFactory
            auto_ptr<ConnectionFactory> connectionFactory(
                ConnectionFactory::createCMSConnectionFactory(brokerURI));

            // Create a Connection
            connection = connectionFactory->createConnection();
            connection->start();
            connection->setExceptionListener(this);

            // Create a Session
            if (this->sessionTransacted == true) {
                session = connection->createSession(Session::SESSION_TRANSACTED);
            } else {
                session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
            }

            // Create the destination (Topic or Queue)

            destination = session->createQueue("TEST.FOO");


            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session->createConsumer(destination);

            consumer->setMessageListener(this);

            std::cout.flush();
            std::cerr.flush();

            // Indicate we are ready for messages.
            latch.countDown();

            // Wait while asynchronous messages come in.
            doneLatch.await(waitMillis);

        } catch (CMSException& e) {
            // Indicate we are ready for messages.
            latch.countDown();
            e.printStackTrace();
        }
    }

    // Called from the consumer since this class is a registered MessageListener.
    virtual void onMessage(const Message* message) {

        static int count = 0;

        try {
            count++;
            const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message);
            string text = "";

            if (textMessage != NULL) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }

            printf("Message #%d Received: %s
", count, text.c_str());

        } catch (CMSException& e) {
            e.printStackTrace();
        }

        // Commit all messages.
        if (this->sessionTransacted) {
            session->commit();
        }

        // No matter what, tag the count down latch until done.
        doneLatch.countDown();
    }

    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
        printf("CMS Exception occurred.  Shutting down client.
");
        ex.printStackTrace();
        exit(1);
    }

private:

    void cleanup() {
        if (connection != NULL) {
            try {
                connection->close();
            } catch (cms::CMSException& ex) {
                ex.printStackTrace();
            }
        }

        // Destroy resources.
        try {
            delete destination;
            destination = NULL;
            delete consumer;
            consumer = NULL;
            delete session;
            session = NULL;
            delete connection;
            connection = NULL;
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
};

int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

    activemq::library::ActiveMQCPP::initializeLibrary();
    {
    std::cout << "=====================================================
";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------
";


    // Set the URI to point to the IP Address of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    // Wire Format Options:
    // =========================
    // Use either stomp or openwire, the default ports are different for each
    //
    // Examples:
    //    tcp://127.0.0.1:61616                      default to openwire
    //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
    //
    // SSL:
    // =========================
    // To use SSL you need to specify the location of the trusted Root CA or the
    // certificate for the broker you want to connect to.  Using the Root CA allows
    // you to use failover with multiple servers all using certificates signed by
    // the trusted root.  If using client authentication you also need to specify
    // the location of the client Certificate.
    //
    //     System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" );
    //     System::setProperty( "decaf.net.ssl.keyStorePassword", "password" );
    //     System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" );
    //
    // The you just specify the ssl transport in the URI, for example:
    //
    //     ssl://localhost:61617
    //
    std::string brokerURI =
        "failover:(tcp://localhost:61616)";

    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in both consumer an producer.
    //============================================================
    bool useTopics = true;
    bool sessionTransacted = false;
    int numMessages = 2000;

    long long startTime = System::currentTimeMillis();

    HelloWorldProducer producer(brokerURI, numMessages, useTopics);
        HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);

    // Start the consumer thread.
    Thread consumerThread(&consumer);
    consumerThread.start();

    // Wait for the consumer to indicate that its ready to go.
    consumer.waitUntilReady();

    // Start the producer thread.
    Thread producerThread(&producer);
    producerThread.start();

    // Wait for the threads to complete.
    producerThread.join();
    consumerThread.join();

    long long endTime = System::currentTimeMillis();
    double totalTime = (double)(endTime - startTime) / 1000.0;

    consumer.close();
    producer.close();

    std::cout << "Time to completion = " << totalTime << " seconds." << std::endl;
    std::cout << "-----------------------------------------------------
";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================
";

    }
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

// END SNIPPET: demo

producer.cpp

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
 
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
 
 
class SimpleProducer : public Runnable {
private:
 
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;
 
private:
 
    SimpleProducer( const SimpleProducer& );
    SimpleProducer& operator= ( const SimpleProducer& );
 
public:
 
    SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
                    const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        useTopic(useTopic),
        clientAck(clientAck),
        numMessages(numMessages),
        brokerURI(brokerURI),
        destURI(destURI) {
    }
 
    virtual ~SimpleProducer(){
        cleanup();
    }
 
    void close() {
        this->cleanup();
    }
 
    virtual void run() {
        try {
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );
           
		   try{
                connection = connectionFactory->createConnection();
                connection->start();
            } catch( CMSException& e ) {
                e.printStackTrace();
                throw e;
            }
 
            if( clientAck ) 
			{
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else 
			{
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
 
            if( useTopic ) 
			{
                destination = session->createTopic( destURI );
            } else 
			{
                destination = session->createQueue( destURI );
            }
 
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
            string threadIdStr = Long::toString( Thread::currentThread()->getId() );
            string text = (string)"Hello world! from thread " + threadIdStr;
 
            for( unsigned int ix=0; ix<numMessages; ++ix )
			{
                TextMessage* message = session->createTextMessage( text );
                message->setIntProperty( "Integer", ix );
                printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
                producer->send( message );
                delete message;
            }
 
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
 
private:
 
    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        destination = NULL;
 
        try
		{
            if( producer != NULL ) delete producer;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        producer = NULL;
 
        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch ( CMSException& e ) { e.printStackTrace(); }
 
        try{
            if( session != NULL ) delete session;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        session = NULL;
 
        try{
            if( connection != NULL ) delete connection;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        connection = NULL;
    }
};
 
int main(int argc , char* argv[]) 
{
 
    activemq::library::ActiveMQCPP::initializeLibrary();
    std::cout << "=====================================================\n";
    std::cout << "Starting produce message:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
 
    std::string brokerURI ="failover://(tcp://127.0.0.1:61616)";
    unsigned int numMessages = 2000;
    std::string destURI = "test.chen";
 
    bool useTopics = false;
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
    producer.run();
    producer.close();
 
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished test" << std::endl;
    std::cout << "=====================================================\n";
 
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

consumer.cpp

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
 
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
 
 
class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener,
                            public DefaultTransportListener {
private:
 
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;
 
private:
 
    SimpleAsyncConsumer( const SimpleAsyncConsumer& );
    SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
 
public:
 
    SimpleAsyncConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        useTopic(useTopic),
        brokerURI(brokerURI),
        destURI(destURI),
        clientAck(clientAck) {
    }
 
    virtual ~SimpleAsyncConsumer() {
        this->cleanup();
    }
 
    void close() {
        this->cleanup();
    }
 
    void runConsumer() {
 
        try {
            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI );
            connection = connectionFactory->createConnection();
            delete connectionFactory;
 
            ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
            if( amqConnection != NULL ) 
			{
                amqConnection->addTransportListener( this );
            }
 
            connection->start();
            connection->setExceptionListener(this);
			
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
 
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
 
            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );
 
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
 
    virtual void onMessage( const Message* message ) {
        static int count = 0;
        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";
 
            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }
 
            if( clientAck ) {
                message->acknowledge();
            }
 
            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
 
    virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }
 
    virtual void transportInterrupted() {
        std::cout << "The Connection's Transport has been Interrupted." << std::endl;
    }
 
    virtual void transportResumed() {
        std::cout << "The Connection's Transport has been Restored." << std::endl;
    }
 
private:
 
    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) {}
        destination = NULL;
 
        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) {}
        consumer = NULL;
 
        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) {}
 
        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) {}
        session = NULL;
 
        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) {}
        connection = NULL;
    }
};
 
int main(int argc, char* argv[]) {
 
    activemq::library::ActiveMQCPP::initializeLibrary();
 
    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
 
    std::string brokerURI = "failover:(tcp://127.0.0.1:61616)";
 
    std::string destURI = "test.chen"; 
    bool useTopics = false;
    bool clientAck = false;
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
    consumer.runConsumer();
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}
    consumer.close();
 
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";
 
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

输出

qt

=====================================================Starting the example:test 123
-----------------------------------------------------help 123
Sent message #1 from thread 140724209268712Sent message #2 from thread 140724209268712Sent message #3 from thread 140724209268712Sent message #4 from thread 140724209268712Sent message #5 from thread 140724209268712Sent message #6 from thread 140724209268712Sent message #7 from thread 140724209268712Sent message #8 from thread 140724209268712Sent message ..... #170 from thread 140724209268712Sent message #171 from thread 140724209268712Message #61 Received: Hello world! from thread 140724209268712Message #62 Received: Hello world! from thread 140724209268712Message #63 Received: Hello world! from thread 140724209268712Message #64 Received: Hello world! from thread 140724209268712Message #65 Received: Hello world! from thread .....
140724209268712Message #75 Received: Hello world! from thread 140724209268712Message #76 Received: Hello world! from thread 140724209268712Sent message #172 from thread 140724209268712Message #77 Received: Hello world! from thread 140724209268712Message #78 Received: Hello world! from thread 140724209268712Sent message #173 from thread 140724209268712Sent message #174 from thread 140724209268712Message #79 Received: Hello world! from thread 140724209268712Sent message #175 from thread 140724209268712Message #80 Received: Hello world! from thread 140724209268712Sent message  #1338 from thread 140724209268712Message #339 Received: Hello world! from thread 140724209268712Sent message #1339 from thread 140724209268712Message #340 Received: Hello world! from thread 140724209268712Message #341 Received: Hello world! from thread 140724209268712Message #342 Received: Hello world! from thread 140724209268712Message #343 Received: Hello world! from thread 140724209268712Message #344 Received: Hello world! from thread 
.........
140724209268712Message #356 Received: Hello world! from thread 140724209268712Message #357 Received: Hello world! from thread 140724209268712Sent message#1631 from thread 140724209268712Message #402 Received: Hello world! from thread 140724209268712Sent message #1632 from thread 140724209268712Sent message #1633 from thread 140724209268712Sent message #1634 from thread 140724209268712Sent message #1635 from thread 140724209268712Sent message #1636 from thread 140724209268712Sent message #1637 from thread 140724209268712Sent message #1638 from thread 140724209268712Sent message #1639 from thread 140724209268712Message #585 Received: Hello world! from thread 140724209268712Message #586 Received: Hello world! from thread 140724209268712Message #587 Received: Hello world! from thread 140724209268712Message #1999 Received: Hello world! from thread 140724209268712Message #2000 Received: Hello world! from thread 140724209268712Time to completion = 0.258 seconds.-----------------------------------------------------Finished with the example.
=====================================================16:24:44: /home/bell/桌面/build-Test_send_recv_info_client-Desktop_Qt_5_13_0_GCC_64bit-Debug/Test_send_recv_info_client exited with code 1

C++ producer.cpp

./consumer 
=====================================================
Starting the example:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
Message #1 Received: Hello world! from thread 140517332345072
Message #2 Received: Hello world! from thread 140517332345072
Message #3 Received: Hello world! from thread 140517332345072
Message #4 Received: Hello world! from thread 140517332345072
Message #5 Received: Hello world! from thread 140517332345072
Message #6 Received: Hello world! from thread 140517332345072
Message #7 Received: Hello world! from thread 140517332345072
Message #8 Received: Hello world! from thread 140517332345072
Message #9 Received: Hello world! from thread 140517332345072
Message #10 Received: Hello world! from thread 140517332345072
Message #11 Received: Hello world! from thread 140517332345072
Message #12 Received: Hello world! from thread 140517332345072
Message #13 Received: Hello world! from thread 140517332345072
Message #14 Received: Hello world! from thread 140517332345072
Message #15 Received: Hello world! from thread 140517332345072
Message #16 Received: Hello world! from thread 140517332345072
Message #17 Received: Hello world! from thread 140517332345072
Message #18 Received: Hello world! from thread 140517332345072
Message #19 Received: Hello world! from thread 140517332345072

C++ consumer.cpp

Sent message #1986 from thread 140517332345072
Sent message #1987 from thread 140517332345072
Sent message #1988 from thread 140517332345072
Sent message #1989 from thread 140517332345072
Sent message #1990 from thread 140517332345072
Sent message #1991 from thread 140517332345072
Sent message #1992 from thread 140517332345072
Sent message #1993 from thread 140517332345072
Sent message #1994 from thread 140517332345072
Sent message #1995 from thread 140517332345072
Sent message #1996 from thread 140517332345072
Sent message #1997 from thread 140517332345072
Sent message #1998 from thread 140517332345072
Sent message #1999 from thread 140517332345072
Sent message #2000 from thread 140517332345072
-----------------------------------------------------
Finished test
=====================================================

参考文档

https://www.codenong.com/jsf80ebb497fea/
https://www.jianshu.com/p/6d6c895c5cb9
https://blog.csdn.net/keketrtr/article/details/86643739
https://blog.csdn.net/weixin_42310458/article/details/125180410

标签:Received,服务,thread,connection,world,message,include,ActiveMQ,搭建
From: https://www.cnblogs.com/bell123/p/17134987.html

相关文章