-
打开IntelliJ IDEA,创建一个Java工程。
-
在pom.xml文件中添加以下依赖引入Java SDK的依赖库。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.4</version> </dependency>
-
在IDEA中打开已创建的Java工程,在src/main/java路径下创建一个Java类。
-
将Java类的内容替换为云消息队列 RocketMQ 版提供的消息订阅代码。示例代码如下:
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。 String consumerGroup = "Your ConsumerGroup"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration clientConfiguration = builder.build(); //订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) //设置消费监听器。 .setMessageListener(messageView -> { //处理消息并返回消费结果。 // LOGGER.info("Consume message={}", messageView); System.out.println("Consume Message: " + messageView); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); //如果不需要再使用PushConsumer,可关闭该进程。 //pushConsumer.close(); } }
-
在IDEA中打开已创建的Java工程,在src/main/java路径下创建一个Java类。
-
将Java类的内容替换为云消息队列 RocketMQ 版提供的消息发送代码。示例代码如下:
-
参考下表修改消费者示例代码中的参数值。以下参数仅为示例值,需要修改为您实际使用的参数值。
参数
示例
说明
endpoints
rmq-cn-******.cn-hangzhou.rmq.aliyuncs.com:8080
实例的公网接入点。
可从云消息队列 RocketMQ 版控制台实例详情页的TCP 协议接入点页签中获取。
本教程以公网环境接入为例,若您使用VPC网络接入,则接入点需要填写为VPC专有网络接入点。
topic
topic_normal
已创建的Topic的名称,表示消费者需要订阅指定的Topic的消息。
可从云消息队列 RocketMQ 版控制台的Topic 管理页面查看。
consumerGroup
GID_test
已创建的Group的ID,表示消费者使用该消费者分组订阅指定的Topic。
可从云消息队列 RocketMQ 版控制台的Group 管理页面查看。
Instance UserName
21Vshz0YD9******
本教程以公网环境接入为例,因此该参数填写为实例用户名。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
Instance Password
VrQCx2xr9a******
本教程以公网环境接入为例,因此该参数填写为实例密码。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
-
参数修改完成后,运行消费者示例代码,启动消费者客户端。返回类似如下信息,表示消费者客户端已接入云消息队列 RocketMQ 版服务端。
此操作为第一次启动消费者,此时生产者还未接入服务端,因此还未获取到消息为正常结果。
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; public class ProducerExample { public static void main(String[] args) throws ClientException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration configuration = builder.build(); /** * 初始化Producer时直接配置需要使用的Topic列表(这个参数可以配置多个Topic),实现提前检查错误配置、拦截非法配置启动。 * 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的Topic是否合法。 * 注意!!!事务消息Topic必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。 */ Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); //普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //消息体。 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } } }
-
-
参考下表修改生产者示例代码中的参数值。以下参数仅为示例值,需要修改为您实际使用的参数值。
参数
示例
说明
endpoints
rmq-cn-******.cn-hangzhou.rmq.aliyuncs.com:8080
实例的公网接入点。
可从云消息队列 RocketMQ 版控制台实例详情页的TCP 协议接入点页签中获取。
本教程以公网环境接入为例,若您使用VPC网络接入,则接入点需要填写为VPC专有网络接入点。
topic
topic_normal
已创建的Topic的名称,表示生产者向哪个Topic发送消息。
可从云消息队列 RocketMQ 版控制台的Topic管理页面查看。
Instance UserName
21Vshz0YD9******
本教程以公网环境接入为例,因此该参数填写为实例用户名。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
Instance Password
VrQCx2xr9a******
本教程以公网环境接入为例,因此该参数填写为实例密码。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
-
参数修改完成后,运行生产者示例代码,启动生产者客户端发送消息。返回类似如下信息,表示生产者客户端已接入云消息队列 RocketMQ 版服务端并成功发送消息。
-
在IDEA中打开已创建的Java工程,在src/main/java路径下创建一个Java类。
-
将Java类的内容替换为云消息队列 RocketMQ 版提供的消息发送代码。示例代码如下:
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; public class ProducerExample { public static void main(String[] args) throws ClientException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration configuration = builder.build(); /** * 初始化Producer时直接配置需要使用的Topic列表(这个参数可以配置多个Topic),实现提前检查错误配置、拦截非法配置启动。 * 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的Topic是否合法。 * 注意!!!事务消息Topic必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。 */ Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); //普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //消息体。 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } } }
-
参考下表修改生产者示例代码中的参数值。以下参数仅为示例值,需要修改为您实际使用的参数值。
参数
示例
说明
endpoints
rmq-cn-******.cn-hangzhou.rmq.aliyuncs.com:8080
实例的公网接入点。
可从云消息队列 RocketMQ 版控制台实例详情页的TCP 协议接入点页签中获取。
本教程以公网环境接入为例,若您使用VPC网络接入,则接入点需要填写为VPC专有网络接入点。
topic
topic_normal
已创建的Topic的名称,表示生产者向哪个Topic发送消息。
可从云消息队列 RocketMQ 版控制台的Topic管理页面查看。
Instance UserName
21Vshz0YD9******
本教程以公网环境接入为例,因此该参数填写为实例用户名。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
Instance Password
VrQCx2xr9a******
本教程以公网环境接入为例,因此该参数填写为实例密码。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
-
参数修改完成后,运行生产者示例代码,启动生产者客户端发送消息。返回类似如下信息,表示生产者客户端已接入云消息队列 RocketMQ 版服务端并成功发送消息。
-
-
查看消息消费结果
消息发送成功后,将运行结果窗口切换到消息订阅运行程序中,会看到返回如下消费结果,您可以根据返回的消息ID在控制台查询消息轨迹。
查询消息轨迹
-
登录云消息队列 RocketMQ 版控制台,在左侧导航栏,选择实例列表。
-
在顶部菜单栏,选择和试用实例相同的地域。
-
在实例列表页面单击试用实例的名称。
-
在左侧导航栏单击消息轨迹,然后单击创建查询任务。
-
在创建消息轨迹查询任务面板中选择消息所属的Topic、选择查询方式为Message ID 查询、输入在消费结果中获取到的Message ID,然后单击确定创建查询任务。
-
刷新页面,待查询任务的状态变为查询完成,然后在其操作列单击查询结果。
-
在查询结果页面中,单击查询结果列表操作列的消息轨迹。
-
您可以在轨迹详情页面查看指定消息在各阶段的详细状态。
各返回参数的详细信息,请参见轨迹参数说明。
本教程使用的标准版实例只能免费试用1个月。试用期结束后您可以选择释放实例或一键转包年包月,否则超过有效期的部分将会按照按量付费方式进行计费。
-
如果您无需使用云消息队列 RocketMQ 版,请按照如下操作及时清理和释放资源。
-
登录云消息队列RocketMQ版控制台,在左侧导航栏选择实例列表。
-
在顶部菜单栏选择试用实例所在的地域,然后在目标试用实例所在的操作列选择更多>释放。
-
在弹出的对话框中单击确定。
-
-
如果您需要继续使用云消息队列 RocketMQ 版,可以将该试用实例转为包年包月计费类型。
-
登录云消息队列RocketMQ版控制台,在左侧导航栏选择实例列表。
-
在顶部菜单栏选择试用实例所在的地域,然后在目标试用实例所在的付费类型列选择转包年包月。
-
按照界面提示完成购买。
-
常用知识点
问题1:如果使用本地网络访问云消息队列 RocketMQ 版服务,SDK代码中的接入点(endpoints)应该填写哪个?(单选题)
问题2:发送普通消息时,创建的Topic的消息类型是什么?(单选题)
延伸阅读
-