首页 > 其他分享 >手写MQ消息中间件(基于netty实现)

手写MQ消息中间件(基于netty实现)

时间:2022-12-28 22:00:52浏览次数:36  
标签:netty mayikt new MQ io 消息中间件 import channel


通讯协议,什么是协议?

如:生产者投递消息到mq服务器之间的通讯如何定义格式就叫协议,(mq协议是amqp协议,是mq作者自己起的通讯协议名称)。

流程图

实现流程:

1.生产者把消息投递到netty服务,消费者自己取。

2.生产者把消息投递到netty服务,自动发送给消费者。


目录结构

目录

流程图

目录结构

MarshallingCodeCFactory

NettyMQServer

NettyMQProducer

NettyMQConsumer

NettyMQServerHandler

NettyMQProducerHandler

NettyMQConsumerHandler

DeliveryInfoEntity


MarshallingCodeCFactory

jboss解码器  二进制转对象、对象转二进制
  1. package com.mayikt.utils;
  2. import io.netty.handler.codec.marshalling.*;
  3. import org.jboss.marshalling.MarshallerFactory;
  4. import org.jboss.marshalling.Marshalling;
  5. import org.jboss.marshalling.MarshallingConfiguration;
  6. /**
  7. * jboss解码器
  8. * 二进制转对象、对象转二进制
  9. */
  10. public final class MarshallingCodeCFactory {
  11. /**
  12. * 创建Jboss Marshalling解码器MarshallingDecoder
  13. * @return MarshallingDecoder
  14. */
  15. public static MarshallingDecoder buildMarshallingDecoder() {
  16. //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
  17. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  18. //创建了MarshallingConfiguration对象,配置了版本号为5
  19. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  20. configuration.setVersion(5);
  21. //根据marshallerFactory和configuration创建provider
  22. UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
  23. //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
  24. MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
  25. return decoder;
  26. }
  27. /**
  28. * 创建Jboss Marshalling编码器MarshallingEncoder
  29. * @return MarshallingEncoder
  30. */
  31. public static MarshallingEncoder buildMarshallingEncoder() {
  32. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  33. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  34. configuration.setVersion(5);
  35. MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
  36. //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
  37. MarshallingEncoder encoder = new MarshallingEncoder(provider);
  38. return encoder;
  39. }
  40. }

NettyMQServer

netty服务器 - MQ服务器
  1. package com.mayikt.mq;
  2. import com.mayikt.handler.NettyMQServerHandler;
  3. import com.mayikt.utils.MarshallingCodeCFactory;
  4. import io.netty.bootstrap.ServerBootstrap;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. /**
  11. * netty服务器 - MQ服务器
  12. */
  13. public class NettyMQServer {
  14. public static void main(String[] args) {
  15. start(5872);
  16. }
  17. public static void start(int port) {
  18. /**
  19. * 客户端创建两个线程池组分别为 boss线程组和工作线程组
  20. */
  21. // 1.用于接受客户端连接的请求 (并没有处理请求)
  22. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  23. // 2.用于处理客户端连接的读写操作
  24. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  25. // 3.用于创建我们的ServerBootstrap
  26. ServerBootstrap serverBootstrap = new ServerBootstrap();
  27. serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
  28. .childHandler(new ChannelInitializer<SocketChannel>() {
  29. @Override
  30. protected void initChannel(SocketChannel ch) throws Exception {
  31. // 解决netty可以支持传输对象
  32. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  33. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  34. ch.pipeline().addLast(new NettyMQServerHandler());
  35. }
  36. });
  37. // 绑定我们的端口号码
  38. try {
  39. // 绑定端口号,同步等待成功
  40. ChannelFuture future = serverBootstrap.bind(port).sync();
  41. System.out.println("MQ服务器启动成功:" + port);
  42. // 等待服务器监听端口
  43. future.channel().closeFuture().sync();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. // 优雅的关闭连接
  48. bossGroup.shutdownGracefully();
  49. workGroup.shutdownGracefully();
  50. }
  51. }
  52. }

NettyMQProducer

netty客户端 - 生产者投递消息

  1. package com.mayikt.mq;
  2. import com.mayikt.entity.DeliveryInfoEntity;
  3. import com.mayikt.handler.NettyMQProducerHandler;
  4. import com.mayikt.handler.NettyMQServerHandler;
  5. import com.mayikt.utils.MarshallingCodeCFactory;
  6. import io.netty.bootstrap.Bootstrap;
  7. import io.netty.bootstrap.ServerBootstrap;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.SocketChannel;
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;
  13. import io.netty.channel.socket.nio.NioSocketChannel;
  14. import java.net.InetSocketAddress;
  15. /**
  16. * netty客户端 - 消费者
  17. */
  18. public class NettyMQProducer {
  19. // netty服务端ip、端口
  20. private static final String host = "127.0.0.1";
  21. private static final int port = 5872;
  22. public static void main(String[] args) {
  23. sendMsg("mayikt", "每特123");
  24. System.out.println("111");
  25. sendMsg("mayikt", "每特123");
  26. }
  27. public static void sendMsg(String queueName, String msg) {
  28. //创建nioEventLoopGroup
  29. NioEventLoopGroup group = new NioEventLoopGroup();
  30. Bootstrap bootstrap = new Bootstrap();
  31. bootstrap.group(group).channel(NioSocketChannel.class)
  32. .remoteAddress(new InetSocketAddress(host, port))
  33. .handler(new ChannelInitializer<SocketChannel>() {
  34. @Override
  35. protected void initChannel(SocketChannel ch) throws Exception {
  36. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  37. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  38. ch.pipeline().addLast(new NettyMQProducerHandler());
  39. }
  40. });
  41. try {
  42. // 发起同步连接
  43. ChannelFuture sync = bootstrap.connect().sync();
  44. DeliveryInfoEntity deliveryInfoEntity = new DeliveryInfoEntity(msg, queueName,
  45. true);
  46. // 发送数据给netty服务端
  47. sync.channel().writeAndFlush(deliveryInfoEntity);
  48. sync.channel().closeFuture().sync();
  49. } catch (Exception e) {
  50. } finally {
  51. group.shutdownGracefully();
  52. }
  53. }
  54. }

NettyMQConsumer

netty客户端 - 消费者接收指定队列消息

  1. package com.mayikt.mq;
  2. import com.mayikt.entity.DeliveryInfoEntity;
  3. import com.mayikt.handler.NettyMQConsumerHandler;
  4. import com.mayikt.utils.MarshallingCodeCFactory;
  5. import io.netty.bootstrap.Bootstrap;
  6. import io.netty.channel.ChannelFuture;
  7. import io.netty.channel.ChannelInitializer;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioSocketChannel;
  11. import java.net.InetSocketAddress;
  12. /**
  13. * netty客户端 - 生产者投递消息
  14. */
  15. public class NettyMQConsumer {
  16. // netty服务端ip、端口
  17. private static final String host = "127.0.0.1";
  18. private static final int port = 5872;
  19. private static String queueName = "mayikt";
  20. public static void main(String[] args) {
  21. sendMsg("mayikt",null);
  22. }
  23. public static void sendMsg(String queueName,String msg) {
  24. //创建nioEventLoopGroup
  25. NioEventLoopGroup group = new NioEventLoopGroup();
  26. Bootstrap bootstrap = new Bootstrap();
  27. bootstrap.group(group).channel(NioSocketChannel.class)
  28. .remoteAddress(new InetSocketAddress(host, port))
  29. .handler(new ChannelInitializer<SocketChannel>() {
  30. @Override
  31. protected void initChannel(SocketChannel ch) throws Exception {
  32. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  33. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  34. ch.pipeline().addLast(new NettyMQConsumerHandler());
  35. }
  36. });
  37. try {
  38. // 发起同步连接
  39. ChannelFuture sync = bootstrap.connect().sync();
  40. DeliveryInfoEntity deliveryInfoEntity = new DeliveryInfoEntity(null, queueName,
  41. false);
  42. // 发送数据给netty服务端
  43. sync.channel().writeAndFlush(deliveryInfoEntity);
  44. sync.channel().closeFuture().sync();
  45. } catch (Exception e) {
  46. } finally {
  47. group.shutdownGracefully();
  48. }
  49. }
  50. }

NettyMQServerHandler

netty服务Handler  mq服务Handler

  1. package com.mayikt.handler;
  2. import com.mayikt.entity.DeliveryInfoEntity;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import org.apache.commons.lang3.StringUtils;
  6. import java.util.*;
  7. /**
  8. * netty服务Handler
  9. */
  10. public class NettyMQServerHandler extends SimpleChannelInboundHandler<DeliveryInfoEntity> {
  11. /**
  12. * mq存放所有的队列 key:队列名称,value:队列数据内容
  13. */
  14. private static Map<String, Queue> queues = new HashMap<String, Queue>();
  15. /**
  16. * 存放我们消费者连接(点对点) key:队列名称,value:管道
  17. */
  18. private static Map<String, ChannelHandlerContext> ctxs = new HashMap<String, ChannelHandlerContext>();
  19. /**
  20. * Netty的服务器端接受 客户端消息 MQ服务器端
  21. *
  22. * @param ctx 管道
  23. * @param dInfo 客户端消息封装的Entity
  24. * @throws Exception
  25. */
  26. @Override
  27. protected void channelRead0(ChannelHandlerContext ctx, DeliveryInfoEntity dInfo) throws Exception {
  28. // 队列名称
  29. String queueName = dInfo.getQueueName();
  30. if (StringUtils.isEmpty(queueName)) {
  31. return;
  32. }
  33. Boolean connType = dInfo.getConnType();
  34. // 如果是为true的情况下 为生产者角色
  35. if (connType) {
  36. // 处理生产者角色(投递消息)
  37. procucterService(queueName, dInfo.getMsg(), ctx);
  38. return;
  39. }
  40. consumer(queueName, ctx);
  41. }
  42. private void procucterService(String queueName, String msg, ChannelHandlerContext channelHandlerContext) {
  43. Queue queue = queues.get(queueName);
  44. if (queue == null) {
  45. // 如果队列不存在的情况下,就创建
  46. queue = new LinkedList();
  47. queues.put(queueName, queue);
  48. }
  49. // 将消息缓存到队列中
  50. queue.offer(msg);
  51. // ack应答(消息是否投递成功)
  52. channelHandlerContext.writeAndFlush("“ack应答”消息投递成功");
  53. // 主动将消息推送给消费者
  54. ChannelHandlerContext ctx = ctxs.get(queueName);
  55. if (ctx != null) {
  56. ctx.writeAndFlush(queue.poll());
  57. }
  58. }
  59. /**
  60. * 消费和mq建立连接主动拉取消息
  61. */
  62. private void consumer(String queueName, ChannelHandlerContext ctx) {
  63. Queue queue = queues.get(queueName);
  64. if (queue == null) {
  65. return;
  66. }
  67. // 获取队列中消息
  68. Object poll = queue.poll();
  69. ctx.writeAndFlush(poll);
  70. // 将消费者连接存放到集合中
  71. ctxs.put(queueName, ctx);
  72. }
  73. }

NettyMQProducerHandler

生产者Handler

  1. package com.mayikt.handler;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. public class NettyMQProducerHandler extends SimpleChannelInboundHandler {
  5. protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object ack) throws Exception {
  6. System.out.println(ack);
  7. // 关闭连接
  8. channelHandlerContext.close();
  9. }
  10. }

NettyMQConsumerHandler

消费者Handler

  1. package com.mayikt.handler;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. /**
  5. * 消费者Handler
  6. */
  7. public class NettyMQConsumerHandler extends SimpleChannelInboundHandler {
  8. @Override
  9. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  10. System.out.println("消费者获取生产者消息:" + msg);
  11. }
  12. }

DeliveryInfoEntity

传输协议的封装Entity - 传输协议如:amqp

  1. package com.mayikt.entity;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. /**
  5. * 传输协议封装Entity - 传输协议如:amqp
  6. */
  7. @Data
  8. public class DeliveryInfoEntity implements Serializable {
  9. /**
  10. * 发送消息内容
  11. */
  12. private String msg;
  13. /**
  14. * 队列名称
  15. */
  16. private String queueName;
  17. /**
  18. * true 生产者投递消息
  19. * false 消费者获取消息
  20. */
  21. private Boolean connType;
  22. public DeliveryInfoEntity(String msg, String queueName, Boolean connType) {
  23. this.msg = msg;
  24. this.queueName = queueName;
  25. this.connType = connType;
  26. }
  27. }

标签:netty,mayikt,new,MQ,io,消息中间件,import,channel
From: https://www.cnblogs.com/LoveShare/p/17011362.html

相关文章

  • 7张图揭晓RocketMQ存储设计的精髓
    简介:RocketMQ作为一款基于磁盘存储的中间件,具有无限积压能力,并提供高吞吐、低延迟的服务能力,其最核心的部分必然是它优雅的存储设计。RocketMQ作为一款基于磁盘存储的中......
  • Linux-CentOS7安装RocketMQ
    下载地址https://archive.apache.org/dist/rocketmq/基础安装#解压unziprocketmq-all-4.7.0-bin-release.zip修改bin/runserver.shJAVA_OPT="${JAVA_OPT}-server......
  • 一分钟搞定Netty 三大组件,如果搞不定,再看3遍
    1.三大组件简介Channel与BufferJavaNIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到IO设备(例如:文件、套接字)的连接。若需要使用NIO系......
  • RocketMq基础
    各个mq比较ActiveMq和RabbitMq单机吞吐量是万级的,RocketMq和kafka是十万级。ActiveMq可能丢失数据,RocketMq可以做到零丢失。 RocketMq四个核心组成部分NameService:......
  • RabbitMQ从入门到精通-工作队列-Work Queues
         工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队......
  • Netty4底层用对象池和不用对象池实践优化
    随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer,情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。......
  • netty的异常分析 IllegalReferenceCountException refCnt: 0, decrement: 1
    在websocket关闭时经常会抛出如下异常: IllegalReferenceCountExceptionrefCnt:0,decrement:1io.netty.util.IllegalReferenceCountException:refCnt:0,decrement:......
  • 高并发下Netty4底层bug导致直接内存溢出分析
    事故记录:10点游戏开服,迅速冲破2300+单区同时在线18点15分,运营反应玩家进不了,准备吃饭的人被抓回来排查故障发现,由于直接内存被占满,一直在FullGC,并且回收不掉,所以完全不......
  • 在Netty底层监控消息发送到Socket的时间
    1、调用writeAndFlush方法之后获取ChannelFuture;2、新增消息发送ChannelFutureListener,监听消息发送结果,如果消息写入网络Socket成功,则Netty会回调C......
  • Netty4 读写水位控制分析
    服务器上看看默认是多少ConfigurehighandlowwritewatermarksSetsane WRITE_BUFFER_HIGH_WATER_MARK andWRITE_BUFFER_LOW_WATER_MARKServerBootstrapbootstrap=......