1.首先准备 Jdk、rocketmq-dashboard-master、apache-maven-3.9.7-bin.tar.gz、rocketmq-all-5.1.0-bin-release.zip 这几样,dashboard是rocketmq的可视化工具,需要maven编译所以需要。
2.创建一个app目录将所有文件解压,
maven要修改config文件更换数据源,创建一个repository仓库
<settings xmlns="http://maven.apache.org/SETTINGS/1.2.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.2.0 https://maven.apache.org/xsd/settings-1.2.0.xsd"> <!-- localRepository | The path to the local repository maven will use to store artifacts. | | Default: ${user.home}/.m2/repository --> <localRepository>/app/repository</localRepository> <pluginGroups> </pluginGroups> <proxies> </proxies> <servers> </servers> <mirrors> <mirror> <id>nexus-aliyun</id> <mirrorOf>*</mirrorOf> <name>Nexus aliyun</name> <url>https://maven.aliyun.com/repository/public</url> </mirror> </mirrors> <profiles> </profiles> </settings>
修改/etc/profile文件内容,放到文件最下面,修改完后用source /etc/profile更新环境
export JAVA_HOME=/app/jdk/ PATH=$MAVEN_HOME/bin:$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin export ROCKETMQ_HOME=/app/rocketmq-all-5.1.0-bin-release export NAMESRV_ADDR=公网IP:9876 export MAVEN_HOME=/app/apache-maven-3.9.7
修改/app/rocketmq-all-5.1.0-bin-release/conf中的配置
#允许自动创建topic autoCreateTopicEnable=true #添加nameserver地址 namesrvAddr=公网ip:9876 brokerIP1=公网ip
修改runborker。sh文件
修改runserver.sh文件
在rocketmq-dashboard-master中使用mvn命令编辑mvn clean package maven.test.skip=true,打包失败的话可以换mvn clean install -U -Dmaven.test.skip=true清除后重新编译
编译成功后,先在cd /app/rocketmq-all-5.1.0-bin-release/bin/中去
运行 nameserver----》nohup ./mqnamesrv &,
运行borker---》 nohup ./mqbroker -c ../conf/broker.conf &
回到cd /app/rocketmq-dashboard-master/target/中去
启动rocketmq-dashboard---》nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar &
启动完成后用jps命令查看启动情况,
在阿里云服务器上需要放开namersrv-9876,borker-10911,dashboard-8080端口
注意如果按了宝塔需要到安全中开放相应端口,否则请求不通。
然后创建一个maven项目引入jar包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.0</version> </dependency>
创建producer
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("SyncProducer"); producer.setNamesrvAddr("公网ip:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { { Message msg = new Message("SyncProducer", "TagA"+i, "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg,8000); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } } //producer.shutdown(); } }
创建消费者
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncProducer"); consumer.subscribe("SyncProducer", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 //consumer.setConsumeTimestamp("20181109221800"); consumer.setNamesrvAddr("公网ip:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
标签:bin,分发,单机,app,maven,Rocketmq,HOME,rocketmq,dashboard From: https://www.cnblogs.com/qiao88/p/18248818