首页 > 其他分享 >原生 API

原生 API

时间:2024-07-22 22:33:57浏览次数:5  
标签:原生 producer System API println new consumer out

同步发送

生产者

// 创建一个生产者  (制定一个组名)
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
// 连接 name server
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动
producer.start();
// 创建10个消息,消息将分摊到 4 个队列上 2-2-3-3
for (int i = 0; i < 10; i++) {
    Message message = new Message("testTopic", "我是一个简单的消息".getBytes());
    // 发送消息
    SendResult sendResult = producer.send(message);
    System.out.println(sendResult.getSendStatus());
}
// 关闭生产者
producer.shutdown();

消费者

// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
// 连接namesrv
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅一个主题  * 标识订阅这个主题中所有的消息,后面说 tag 过滤
consumer.subscribe("testTopic", "*");
// 设置一个监听器 (一直监听的, 异步回调方式),并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 这个就是消费的方法 (业务处理)
        System.out.println("我是消费者");
        System.out.println(msgs.get(0).toString());
        System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
        System.out.println("消费上下文:" + context);
        // 返回值 CONSUME_SUCCESS 成功,消息会从mq出队
        // 返回值 RECONSUME_LATER 失败, 消息会重新回到队列(默认消费重试16次)
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动
consumer.start();

异步发送

DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("发送成功");
    }

    @Override
    public void onException(Throwable e) {
        System.err.println("发送失败:" + e.getMessage());
    }
});
System.out.println("我先执行");

单向发送

DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("onewayTopic", "日志xxx".getBytes());
producer.sendOneway(message);
System.out.println("成功");
producer.shutdown();

批量发送

生产者

// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
// 设置nameServer地址
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动实例
producer.start();
List<Message> msgs = Arrays.asList(
        new Message("batchTopic", "我是一组消息的A消息".getBytes()),
        new Message("batchTopic", "我是一组消息的B消息".getBytes()),
        new Message("batchTopic", "我是一组消息的C消息".getBytes())
);
// 发送一组消息,这是一次发送,这批次的消息会被放到一个队列上
SendResult send = producer.send(msgs);
System.out.println(send);
// 关闭实例
producer.shutdown();

消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("batchTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 会打印 3 次
        System.out.println("收到消息了" + new Date());
        System.out.println(msgs.size());
        System.out.println(new String(msgs.get(0).getBody()));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

延迟消息

DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());
// 给消息设置一个延迟时间
// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(3);
// 发延迟消息
producer.send(message);
System.out.println("发送时间" + new Date());
producer.shutdown();

顺序消息

  1. 因为消费者默认并发模式,也就是多线程,多线程是需要抢夺 cpu 调度权的,多线程天生不能保证顺序
  2. 就算改为单线程也不能保证顺序,假设 Topic 2 个队列,一个是空的一个有3条消息,消费者轮询取消息

所以要保证顺序就需要满足两个条件:1,单线程模式消费;2,消息放到一个队列或 topic 只设置一个队列

生产者

DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();

msgModels.forEach(msgModel -> {
    Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
    try {
        // msgModel.getOrderSn() 订单号,表示同一个订单号的发到一个队列
        producer.send(message, new MessageQueueSelector() {
            // arg 就是 msgModel.getOrderSn()
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                int hashCode = arg.toString().hashCode();
                // 2 % 4 =2
                // 3 % 4 =3
                // 4 % 4 =0
                // 5 % 4 =1
                // 6 % 4 =2  周期性函数
                int i = hashCode % mqs.size();
                return mqs.get(i);
            }
        }, msgModel.getOrderSn());

    } catch (Exception e) {
        e.printStackTrace();
    }
});
producer.shutdown();
System.out.println("发送完成");

消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderlyTopic", "*");
// MessageListenerConcurrently 并发模式 多线程的  重试16次
// MessageListenerOrderly 顺序模式 单线程的  无限重试(次数为 Integer.Max_Value)
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        System.out.println("线程id:" + Thread.currentThread().getId());
        System.out.println(new String(msgs.get(0).getBody()));
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();

带 Tag

生产者

DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
producer.send(message);
producer.send(message2);
System.out.println("发送成功");
producer.shutdown();

消费者1

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 只接收 vip1 的消息
consumer.subscribe("tagTopic", "vip1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

消费者2

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// vip1 vip2 的消息都接收
consumer.subscribe("tagTopic", "vip1 || vip2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

带 Key

生产者

DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
producer.send(message);
System.out.println("发送成功");
producer.shutdown();

消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("keyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        MessageExt messageExt = msgs.get(0);
        System.out.println("我是vip1的消费者,我正在消费消息" + new String(messageExt.getBody()));
        System.out.println("我们业务的标识:" + messageExt.getKeys());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

重试

生产者

DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 生产者发送消息 重试次数
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2);
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());
producer.send(message);
System.out.println("发送成功");
producer.shutdown();

消费者

/**
 * 重试的时间间隔
 * 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 * 默认重试16次
 * 1.能否自定义重试次数
 * 2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的?  是一个死信消息 则会放在一个死信主题中去 主题的名称:%DLQ%retry-consumer-group
 * 3.当消息处理失败的时候 该如何正确的处理?
 * --------------
 * 重试的次数一般 5次
 * @throws Exception
 */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
// 设定重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        MessageExt messageExt = msgs.get(0);
        System.out.println(new Date());
        System.out.println(messageExt.getReconsumeTimes());
        System.out.println(new String(messageExt.getBody()));
        // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});
consumer.start();

标签:原生,producer,System,API,println,new,consumer,out
From: https://www.cnblogs.com/cyrushuang/p/18317114

相关文章

  • 在Spring Boot中实现API网关与路由
    在SpringBoot中实现API网关与路由大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何在SpringBoot中实现API网关与路由。API网关是一种用于管理和路由请求的中间层,它可以集中处理认证、路由、负载均衡、缓存等功能。SpringCloudGateway......
  • 云原生周刊:Kubernetes v1.31 中的移除和主要变更|2024.7.22
    开源项目ArgoRolloutsArgoRollouts是一个Kubernetes控制器和一组自定义资源定义(CRDs),提供高级部署功能,例如蓝绿部署、金丝雀部署、金丝雀分析、实验以及渐进式交付功能给Kubernetes。ArgoRollouts可选地集成了Ingress控制器和服务网格,利用它们的流量塑形能力,在更新期......
  • 后端API接口定义返回编码规则
    code状态码code返回状态码,一般小伙伴们是在开发的时候需要什么,就添加什么。如接口要返回用户权限异常,我们加一个状态码为101吧,下一次又要加一个数据参数异常,就加一个102的状态码。这样虽然能够照常满足业务,但状态码太凌乱了。我们应该可以参考HTTP请求返回的状态码(下面是常见的......
  • 深信服务超融合管理Api调用
    usingJint;usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Drawing;usingSystem.Linq;usingSystem.Text;usingSystem.Threading.Tasks;usingSystem.Windows.Forms;usingSystem.IO;usingSystem......
  • Go语言中使用K8s API及一些常用API整理
    Go语言中使用K8sAPI及一些常用API整理发布于 2022-05-0915:54:402K0举报文章被收录于专栏:devops_k8sGoClient在进入代码之前,理解k8s的goclient项目是对我们又帮助的。它是k8sclient中最古老的一个,因此具有很多特性。Client-go没有使用Swagger......
  • 如何修复包含 OpenAi api 的 Flask 服务器的 405 错误?
    我正在尝试向我的网页添加API,之前从未使用过任何Flask服务器,也从未使用过Javascript,所以这是一次全新的学习体验。我的问题是我不断收到405错误代码,指出该方法不被允许。我继续使用POST方法,但它不起作用,我相信我的问题可能更多地出在我的HTML代码而不是我的Flask服......
  • 在 VSCode 中通过 Python 使用 YouTube API 时如何启用 Intellisense
    我想在使用GoogleYouTubeAPI和Python时在VSCode中获得IntelliSense。但我不知道详细步骤。fromgoogleapiclient.discoveryimportbuildapi_key="****"youtube=build("youtube","v3",developerKey=api_key)request=youtube.channels().list(part......
  • 纳米体育数据API电竞数据接口:指数数据包接口文档API示例①
    纳米体育数据的数据接口通过JSON拉流方式获取200多个国家的体育赛事实时数据或历史数据的编程接口,无请求次数限制,可按需购买,接口稳定高效;覆盖项目包括足球、篮球、网球、电子竞技、奥运等专题、数据内容。纳米数据API2.0版本包含http协议以及websocket协议,主要通过http获取数......
  • 首发:vue开发微信小程序文本内容安全审核php api接口代码完整分享
    微信小程序的文本内容安全审核,有用户输入内容的小程序都用的上,本人自己项目开发使用的接入代码分享给大家,直接复制粘贴过去就能用,如果用的上记得点赞支持。上代码,配合之前发的access_token获取接口使用,把下面代码里面的token刷新接口地址换成你的即可。https://blog.csdn.net/......
  • LTC莱特币实时行情API接口
    LTC莱特币实时行情API接口#RestfulAPIhttps://tsanghi.com/api/fin/crypto/realtime?token={token}&ticker={ticker}指定加密货币代码,获取该加密货币的实时行情(开、高、低、收)。更新周期:实时。请求方式:GET。#莱特币测试接口https://tsanghi.com/api/fin/cr......