ActiveMQ服务搭建
启动activeMQ服务器
1首先要下载bin压缩文件 https://activemq.apache.org/
Components -> ActiveMQ "Classic" -> 下载 apache-activemq-5.9.0-bin.tar.gz (http://activemq.apache.org/activemq-590-release.html)
或者 wget http://activemq.apache.org/activemq-590-release.html
tar -zxvf apache-activemq-5.9.0-bin.tar.gz
./activemq start(我在这个地方执行失败了)
INFO: Loading '/etc/default/activemq'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
./activemq: 396: ./activemq: /usr/bin/java -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.security.auth.login.config=/home/bell/apache-activemq-5.9.0/conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir="/home/bell/apache-activemq-5.9.0/tmp" -Dactivemq.classpath="/home/bell/apache-activemq-5.9.0/conf;" -Dactivemq.home="/home/bell/apache-activemq-5.9.0" -Dactivemq.base="/home/bell/apache-activemq-5.9.0" -Dactivemq.conf="/home/bell/apache-activemq-5.9.0/conf" -Dactivemq.data="/home/bell/apache-activemq-5.9.0/data" -jar "/home/bell/apache-activemq-5.9.0/bin/activemq.jar" start >/dev/null 2>&1 &
RET="$?"; APID="$!";
echo $APID > /home/bell/apache-activemq-5.9.0/data/activemq-ubuntu.pid;
echo "INFO: pidfile created : '/home/bell/apache-activemq-5.9.0/data/activemq-ubuntu.pid' (pid '$APID')";exit $RET: not found
执行 uname -a
Linux ubuntu 4.15.0-142-generic #146~16.04.1-Ubuntu SMP Tue Apr 13 09:27:15 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
接着进入 /bin/linux-x86-64 里面有 activemq
执行 ./activemq start
./activemq start
Starting ActiveMQ Broker...
如果要停止 ./activemq stop
Stopping ActiveMQ Broker...
Waiting for ActiveMQ Broker to exit...
Stopped ActiveMQ Broker.
ps -elf | grep active 查看后台运行状态
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 访问链接形式:http://127.0.0.1:8161/admin/
用户名:admin 密码: admin
安装activeMQ CPP库
从官网下载 CPP源码 Components -> CMS Client
下载解压
查看README.txt
sudo apt-get install autoconf
sudo apt-get install automake
sudo apt-get install libtool
下载APR库 https://apr.apache.org/download.cgi
下载:apr-1.7.2.tar.gz
解压:tar -zxvf apr-1.7.2.tar.gz
./configure; make; make test; make install
默认安装在 /usr/local/apr里
安装openssl(需要用到ssl传输功能) 这里我已经装过了,小伙伴们可以自己查查相关文档,安装很简单 不要这个功能 --disable-ssl
安装 activeMQ-CPP
sudo apt-get install build-essential (ubuntu)
进入到解压目录里
./configure; make; make install; make test(可以省略)
在Qt中新建项目测试
在这里需要加载lib(*.so)目录所在位置 include目录所在位置
我的是 /usr/include /usr/local/apr /usr/local/activemq
(QT .pro文件中)
win32:CONFIG(release, debug|release): LIBS += -L$$PWD/../../../../usr/local/lib/release/ -lactivemq-cpp
else:win32:CONFIG(debug, debug|release): LIBS += -L$$PWD/../../../../usr/local/lib/debug/ -lactivemq-cpp
else:unix: LIBS += -L$$PWD/../../../../usr/local/lib/ -lactivemq-cpp
INCLUDEPATH += $$PWD/../../../../usr/include
DEPENDPATH += $$PWD/../../../../usr/include
win32:CONFIG(release, debug|release): LIBS += -L$$PWD/../../../../usr/local/apr/lib/release/ -lapr-1
else:win32:CONFIG(debug, debug|release): LIBS += -L$$PWD/../../../../usr/local/apr/lib/debug/ -lapr-1
else:unix: LIBS += -L$$PWD/../../../../usr/local/apr/lib/ -lapr-1
INCLUDEPATH += $$PWD/../../../../usr/include
DEPENDPATH += $$PWD/../../../../usr/include
我在编译的时候遇到报错 找不到头文件 和库文件 链接出现问题 主要是忘了包含库的头文件和动态库
新建C++测试
新建一个consumer.cpp 和 producer.cpp
编译
g++ consumer.cpp -o consumer -I /usr/include/activemq/ -L /usr/local/lib/ -lactivemq-cpp
g++ producer.cpp -o send -I /usr/include/activemq/ -L /usr/local/lib/ -lactivemq-cpp
编译的时候出现过的问题
/usr/bin/ld: 找不到 -lactivemq-cpp1 链接名字有问题
./send: error while loading shared libraries: libactivemq-cpp.so.19: cannot open shared object file: No such file or directory
把libactivemq-cpp.so文件找到 在/usr/local/lib里
sudo vim /etc/ld.so.conf 在里面把这个路径加上
再用命令刷新:sudo /sbin/ldconfig -v
代码
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace activemq::core;
using namespace decaf::util::concurrent;
using namespace decaf::util;
using namespace decaf::lang;
using namespace cms;
using namespace std;
class HelloWorldProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
int numMessages;
bool useTopic;
bool sessionTransacted;
std::string brokerURI;
private:
HelloWorldProducer(const HelloWorldProducer&);
HelloWorldProducer& operator=(const HelloWorldProducer&);
public:
HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
numMessages(numMessages),
useTopic(useTopic),
sessionTransacted(sessionTransacted),
brokerURI(brokerURI) {
}
virtual ~HelloWorldProducer(){
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
auto_ptr<ConnectionFactory> connectionFactory(
ConnectionFactory::createCMSConnectionFactory(brokerURI));
// Create a Connection
connection = connectionFactory->createConnection();
connection->start();
// Create a Session
if (this->sessionTransacted) {
session = connection->createSession(Session::SESSION_TRANSACTED);
} else {
session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
destination = session->createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer(destination);
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
// Create the Thread Id String
string threadIdStr = Long::toString(Thread::currentThread()->getId());
// Create a messages
string text = (string) "Hello world! from thread " + threadIdStr;
for (int ix = 0; ix < numMessages; ++ix) {
std::auto_ptr<TextMessage> message(session->createTextMessage(text));
message->setIntProperty("Integer", ix);
printf("Sent message #%d from thread %s
", ix + 1, threadIdStr.c_str());
producer->send(message.get());
}
} catch (CMSException& e) {
e.printStackTrace();
}
}
private:
void cleanup() {
if (connection != NULL) {
try {
connection->close();
} catch (cms::CMSException& ex) {
ex.printStackTrace();
}
}
// Destroy resources.
try {
delete destination;
destination = NULL;
delete producer;
producer = NULL;
delete session;
session = NULL;
delete connection;
connection = NULL;
} catch (CMSException& e) {
e.printStackTrace();
}
}
};
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
CountDownLatch latch;
CountDownLatch doneLatch;
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
long waitMillis;
bool useTopic;
bool sessionTransacted;
std::string brokerURI;
private:
HelloWorldConsumer(const HelloWorldConsumer&);
HelloWorldConsumer& operator=(const HelloWorldConsumer&);
public:
HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :
latch(1),
doneLatch(numMessages),
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
waitMillis(waitMillis),
useTopic(useTopic),
sessionTransacted(sessionTransacted),
brokerURI(brokerURI) {
}
virtual ~HelloWorldConsumer() {
cleanup();
}
void close() {
this->cleanup();
}
void waitUntilReady() {
latch.await();
}
virtual void run() {
try {
// Create a ConnectionFactory
auto_ptr<ConnectionFactory> connectionFactory(
ConnectionFactory::createCMSConnectionFactory(brokerURI));
// Create a Connection
connection = connectionFactory->createConnection();
connection->start();
connection->setExceptionListener(this);
// Create a Session
if (this->sessionTransacted == true) {
session = connection->createSession(Session::SESSION_TRANSACTED);
} else {
session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
destination = session->createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer(destination);
consumer->setMessageListener(this);
std::cout.flush();
std::cerr.flush();
// Indicate we are ready for messages.
latch.countDown();
// Wait while asynchronous messages come in.
doneLatch.await(waitMillis);
} catch (CMSException& e) {
// Indicate we are ready for messages.
latch.countDown();
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered MessageListener.
virtual void onMessage(const Message* message) {
static int count = 0;
try {
count++;
const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message);
string text = "";
if (textMessage != NULL) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
printf("Message #%d Received: %s
", count, text.c_str());
} catch (CMSException& e) {
e.printStackTrace();
}
// Commit all messages.
if (this->sessionTransacted) {
session->commit();
}
// No matter what, tag the count down latch until done.
doneLatch.countDown();
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
printf("CMS Exception occurred. Shutting down client.
");
ex.printStackTrace();
exit(1);
}
private:
void cleanup() {
if (connection != NULL) {
try {
connection->close();
} catch (cms::CMSException& ex) {
ex.printStackTrace();
}
}
// Destroy resources.
try {
delete destination;
destination = NULL;
delete consumer;
consumer = NULL;
delete session;
session = NULL;
delete connection;
connection = NULL;
} catch (CMSException& e) {
e.printStackTrace();
}
}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
{
std::cout << "=====================================================
";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------
";
// Set the URI to point to the IP Address of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// http://activemq.apache.org/cms/
//
// Wire Format Options:
// =========================
// Use either stomp or openwire, the default ports are different for each
//
// Examples:
// tcp://127.0.0.1:61616 default to openwire
// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead
//
// SSL:
// =========================
// To use SSL you need to specify the location of the trusted Root CA or the
// certificate for the broker you want to connect to. Using the Root CA allows
// you to use failover with multiple servers all using certificates signed by
// the trusted root. If using client authentication you also need to specify
// the location of the client Certificate.
//
// System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" );
// System::setProperty( "decaf.net.ssl.keyStorePassword", "password" );
// System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" );
//
// The you just specify the ssl transport in the URI, for example:
//
// ssl://localhost:61617
//
std::string brokerURI =
"failover:(tcp://localhost:61616)";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in both consumer an producer.
//============================================================
bool useTopics = true;
bool sessionTransacted = false;
int numMessages = 2000;
long long startTime = System::currentTimeMillis();
HelloWorldProducer producer(brokerURI, numMessages, useTopics);
HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);
// Start the consumer thread.
Thread consumerThread(&consumer);
consumerThread.start();
// Wait for the consumer to indicate that its ready to go.
consumer.waitUntilReady();
// Start the producer thread.
Thread producerThread(&producer);
producerThread.start();
// Wait for the threads to complete.
producerThread.join();
consumerThread.join();
long long endTime = System::currentTimeMillis();
double totalTime = (double)(endTime - startTime) / 1000.0;
consumer.close();
producer.close();
std::cout << "Time to completion = " << totalTime << " seconds." << std::endl;
std::cout << "-----------------------------------------------------
";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================
";
}
activemq::library::ActiveMQCPP::shutdownLibrary();
}
// END SNIPPET: demo
producer.cpp
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI) {
}
virtual ~SimpleProducer(){
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
auto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );
try{
connection = connectionFactory->createConnection();
connection->start();
} catch( CMSException& e ) {
e.printStackTrace();
throw e;
}
if( clientAck )
{
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else
{
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
if( useTopic )
{
destination = session->createTopic( destURI );
} else
{
destination = session->createQueue( destURI );
}
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
string threadIdStr = Long::toString( Thread::currentThread()->getId() );
string text = (string)"Hello world! from thread " + threadIdStr;
for( unsigned int ix=0; ix<numMessages; ++ix )
{
TextMessage* message = session->createTextMessage( text );
message->setIntProperty( "Integer", ix );
printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) { e.printStackTrace(); }
destination = NULL;
try
{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) { e.printStackTrace(); }
producer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) { e.printStackTrace(); }
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) { e.printStackTrace(); }
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) { e.printStackTrace(); }
connection = NULL;
}
};
int main(int argc , char* argv[])
{
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting produce message:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI ="failover://(tcp://127.0.0.1:61616)";
unsigned int numMessages = 2000;
std::string destURI = "test.chen";
bool useTopics = false;
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
producer.run();
producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished test" << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
consumer.cpp
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic;
std::string brokerURI;
std::string destURI;
bool clientAck;
private:
SimpleAsyncConsumer( const SimpleAsyncConsumer& );
SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
clientAck(clientAck) {
}
virtual ~SimpleAsyncConsumer() {
this->cleanup();
}
void close() {
this->cleanup();
}
void runConsumer() {
try {
ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI );
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL )
{
amqConnection->addTransportListener( this );
}
connection->start();
connection->setExceptionListener(this);
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const Message* message ) {
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
if( clientAck ) {
message->acknowledge();
}
printf( "Message #%d Received: %s\n", count, text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
virtual void transportInterrupted() {
std::cout << "The Connection's Transport has been Interrupted." << std::endl;
}
virtual void transportResumed() {
std::cout << "The Connection's Transport has been Restored." << std::endl;
}
private:
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
int main(int argc, char* argv[]) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI = "failover:(tcp://127.0.0.1:61616)";
std::string destURI = "test.chen";
bool useTopics = false;
bool clientAck = false;
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
consumer.runConsumer();
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
consumer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
输出
qt
=====================================================Starting the example:test 123
-----------------------------------------------------help 123
Sent message #1 from thread 140724209268712Sent message #2 from thread 140724209268712Sent message #3 from thread 140724209268712Sent message #4 from thread 140724209268712Sent message #5 from thread 140724209268712Sent message #6 from thread 140724209268712Sent message #7 from thread 140724209268712Sent message #8 from thread 140724209268712Sent message ..... #170 from thread 140724209268712Sent message #171 from thread 140724209268712Message #61 Received: Hello world! from thread 140724209268712Message #62 Received: Hello world! from thread 140724209268712Message #63 Received: Hello world! from thread 140724209268712Message #64 Received: Hello world! from thread 140724209268712Message #65 Received: Hello world! from thread .....
140724209268712Message #75 Received: Hello world! from thread 140724209268712Message #76 Received: Hello world! from thread 140724209268712Sent message #172 from thread 140724209268712Message #77 Received: Hello world! from thread 140724209268712Message #78 Received: Hello world! from thread 140724209268712Sent message #173 from thread 140724209268712Sent message #174 from thread 140724209268712Message #79 Received: Hello world! from thread 140724209268712Sent message #175 from thread 140724209268712Message #80 Received: Hello world! from thread 140724209268712Sent message #1338 from thread 140724209268712Message #339 Received: Hello world! from thread 140724209268712Sent message #1339 from thread 140724209268712Message #340 Received: Hello world! from thread 140724209268712Message #341 Received: Hello world! from thread 140724209268712Message #342 Received: Hello world! from thread 140724209268712Message #343 Received: Hello world! from thread 140724209268712Message #344 Received: Hello world! from thread
.........
140724209268712Message #356 Received: Hello world! from thread 140724209268712Message #357 Received: Hello world! from thread 140724209268712Sent message#1631 from thread 140724209268712Message #402 Received: Hello world! from thread 140724209268712Sent message #1632 from thread 140724209268712Sent message #1633 from thread 140724209268712Sent message #1634 from thread 140724209268712Sent message #1635 from thread 140724209268712Sent message #1636 from thread 140724209268712Sent message #1637 from thread 140724209268712Sent message #1638 from thread 140724209268712Sent message #1639 from thread 140724209268712Message #585 Received: Hello world! from thread 140724209268712Message #586 Received: Hello world! from thread 140724209268712Message #587 Received: Hello world! from thread 140724209268712Message #1999 Received: Hello world! from thread 140724209268712Message #2000 Received: Hello world! from thread 140724209268712Time to completion = 0.258 seconds.-----------------------------------------------------Finished with the example.
=====================================================16:24:44: /home/bell/桌面/build-Test_send_recv_info_client-Desktop_Qt_5_13_0_GCC_64bit-Debug/Test_send_recv_info_client exited with code 1
C++ producer.cpp
./consumer
=====================================================
Starting the example:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
Message #1 Received: Hello world! from thread 140517332345072
Message #2 Received: Hello world! from thread 140517332345072
Message #3 Received: Hello world! from thread 140517332345072
Message #4 Received: Hello world! from thread 140517332345072
Message #5 Received: Hello world! from thread 140517332345072
Message #6 Received: Hello world! from thread 140517332345072
Message #7 Received: Hello world! from thread 140517332345072
Message #8 Received: Hello world! from thread 140517332345072
Message #9 Received: Hello world! from thread 140517332345072
Message #10 Received: Hello world! from thread 140517332345072
Message #11 Received: Hello world! from thread 140517332345072
Message #12 Received: Hello world! from thread 140517332345072
Message #13 Received: Hello world! from thread 140517332345072
Message #14 Received: Hello world! from thread 140517332345072
Message #15 Received: Hello world! from thread 140517332345072
Message #16 Received: Hello world! from thread 140517332345072
Message #17 Received: Hello world! from thread 140517332345072
Message #18 Received: Hello world! from thread 140517332345072
Message #19 Received: Hello world! from thread 140517332345072
C++ consumer.cpp
Sent message #1986 from thread 140517332345072
Sent message #1987 from thread 140517332345072
Sent message #1988 from thread 140517332345072
Sent message #1989 from thread 140517332345072
Sent message #1990 from thread 140517332345072
Sent message #1991 from thread 140517332345072
Sent message #1992 from thread 140517332345072
Sent message #1993 from thread 140517332345072
Sent message #1994 from thread 140517332345072
Sent message #1995 from thread 140517332345072
Sent message #1996 from thread 140517332345072
Sent message #1997 from thread 140517332345072
Sent message #1998 from thread 140517332345072
Sent message #1999 from thread 140517332345072
Sent message #2000 from thread 140517332345072
-----------------------------------------------------
Finished test
=====================================================
参考文档
https://www.codenong.com/jsf80ebb497fea/
https://www.jianshu.com/p/6d6c895c5cb9
https://blog.csdn.net/keketrtr/article/details/86643739
https://blog.csdn.net/weixin_42310458/article/details/125180410