首页 > 其他分享 >Netty 客户端与服务端收发消息demo

Netty 客户端与服务端收发消息demo

时间:2024-05-22 08:57:38浏览次数:37  
标签:Netty netty demo xcspringboot 服务端 io new import channel

客户端与服务端收发消息,要实现的具体功能是:在控制台输入一条消息之后按回车键,校验完客户端的登录状态之后,把消息发送到服务端;服务端收到消息之后打印,并向客户端发送一条消息,客户端收到消息之后打印。

 

客户端NettyClient

import com.xc.xcspringboot.x_netty.client.handler.*;
import com.xc.xcspringboot.x_netty.protocol.PacketCodec;
import com.xc.xcspringboot.x_netty.protocol.request.MessageRequestPacket;
import com.xc.xcspringboot.x_netty.util.LoginUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

public class NettyClient {

    private static final int MAX_RETRY = 5;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8000;

    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        // 引导器引导启动
        bootstrap
                .group(workerGroup) // 指定线程模型,驱动连接的数据读写
                .channel(NioSocketChannel.class) // 指定IO模型为NioSocketChannel,表示IO模型为NIO
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 表示连接的超时时间,超过这个时间,如果仍未连接到服务端,则表示连接失败。
                .option(ChannelOption.SO_KEEPALIVE, true) // 表示是否开启TCP底层心跳机制,true表示开启。
                .option(ChannelOption.TCP_NODELAY, true) // 表示是否开始Nagle算法,true表示关闭,false表示开启。
                .handler(new ChannelInitializer<Channel>() { // 给引导类指定一个Handler,主要定义连接的业务处理逻辑
                    @Override
                    protected void initChannel(Channel channel) {
//                        channel.pipeline().addLast(new StringEncoder());
//                        channel.pipeline().addLast(new FirstClientHandler());
                        channel.pipeline().addLast(new ClientHandler());
                    }
                });

        // 建立通道
        /*
         * 配置完线程模型、IO模型、业务处理逻辑之后,调用connect方法进行连接,
         * 可以看到connect方法有两个参数,第一个参数可以填写IP或者域名,第二个参数填写端口号。
         * 由于connect方法返回的是一个Future,也就是说这个方法是异步的,通过addListener方法可以监听连接是否成功,进而打印连接信息。
         */
//        Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();

        connect(bootstrap, HOST, PORT, MAX_RETRY);

    }

    private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println(new Date() + ": 连接成功,启动控制台线程……");
                Channel channel = ((ChannelFuture) future).channel();
                startConsoleThread(channel);
            } else if (retry == 0) {
                System.err.println("重试次数已用完,放弃连接!");
            } else {
                // 第几次重连
                int order = (MAX_RETRY - retry) + 1;
                // 本次重连的间隔
                int delay = 1 << order;
                System.err.println(new Date() + ": 连接失败,第" + order + "次重连…… delay:" + delay);
                /*
                 * 定时任务调用的是bootstrap.config().group().schedule(),
                 * 其中bootstrap.config()这个方法返回的是BootstrapConfig,它是对Bootstrap配置参数的抽象,
                 * 然后bootstrap.config().group()返回的就是我们在一开始配置的线程模型workerGroup,
                 * 调用workerGroup的schedule方法即可实现定时任务逻辑。
                 */
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
            }
        });
    }

    /*private static void startConsoleThread(Channel channel) throws InterruptedException {
        while (true) {
            channel.writeAndFlush(new Date() + " Hello world");
            Thread.sleep(2000);
        }
    }*/

    private static void startConsoleThread(Channel channel) {
        new Thread(() -> {
            while (!Thread.interrupted()) {
                if (LoginUtil.hasLogin(channel)) { // 在判断是否登录成功的时候取出这个标志位
                    System.out.println("输入消息发送至服务端: ");
                    Scanner SC = new Scanner(System.in);
                    String line = SC.nextLine();
                    MessageRequestPacket packet = new MessageRequestPacket();
                    packet.setMessage(line);
                    ByteBuf buffer = channel.alloc().buffer();
                    PacketCodec.INSTANCE.encode(buffer, packet);
                    channel.writeAndFlush(buffer);
                }
            }
        }).start();
    }

}

 

ClientHandler

import com.xc.xcspringboot.x_netty.protocol.Packet;
import com.xc.xcspringboot.x_netty.protocol.PacketCodec;
import com.xc.xcspringboot.x_netty.protocol.request.LoginRequestPacket;
import com.xc.xcspringboot.x_netty.protocol.response.LoginResponsePacket;
import com.xc.xcspringboot.x_netty.protocol.response.MessageResponsePacket;
import com.xc.xcspringboot.x_netty.util.LoginUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    // 当 Channel 已经连接/绑定并且已经就绪时调用
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ":客户端开始登录");
        //创建登录对象
        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
//        loginRequestPacket.setUserId(UUID.randomUUID().toString());
        loginRequestPacket.setUserName("flash");
        loginRequestPacket.setPassword("pwd");
        //编码
        ByteBuf buffer = ctx.alloc().buffer();
        PacketCodec.INSTANCE.encode(buffer, loginRequestPacket);
        // 写数据
        ctx.channel().writeAndFlush(buffer);
    }

    // 当从 Channel 读取数据时被调用
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        Packet packet = PacketCodec.INSTANCE.decode(byteBuf);
        if (packet instanceof LoginResponsePacket) { // 登录逻辑
            LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;
            if (loginResponsePacket.isSuccess()) {
                LoginUtil.markAsLogin(ctx.channel()); // 在登录成功之后,给Channel绑定一个登录成功的标志位
                System.out.println(new Date() + ":客户端登录成功");
            } else {
                System.out.println(new Date() + ":客户端登录失败,原因:" + loginResponsePacket.getReason());
            }
        } else if (packet instanceof MessageResponsePacket) { // 消息逻辑
            MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet;
            System.out.println(new Date() + ":收到服务端的消息:" + messageResponsePacket.getMessage());
        }
    }

}

  

服务端

import com.xc.xcspringboot.x_netty.server.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Date;

public class NettyServer {

    private static final int PORT = 8000;

    public static void main(String[] args) {
        NioEventLoopGroup boosGroup = new NioEventLoopGroup(); // bossGroup表示监听端口,接收新连接的线程组
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // workerGroup表示处理每一个连接的数据读写的线程组

        ServerBootstrap serverBootstrap = new ServerBootstrap(); // 引导类ServerBootstrap,这个类将引导服务端的启动工作
        serverBootstrap
                .group(boosGroup, workerGroup) // .group(bossGroup,workerGroup)给引导类配置两大线程组
                .channel(NioServerSocketChannel.class) // 指定服务端的IO模型为NIO NioServerSocketChannel是对NIO类型连接的抽象
                .option(ChannelOption.SO_BACKLOG, 1024) // 表示系统用于临时存放已完成三次握手的请求的队列的最大长度
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 表示是否开启TCP底层心跳机制,true表示开启。
                .childOption(ChannelOption.TCP_NODELAY, true) // 表示是否开启Nagle算法,true表示关闭,false表示开启
                .handler(new ChannelInitializer<NioServerSocketChannel>() { // handler()方法用于指定在服务端启动过程中的一些逻辑
                    @Override
                    protected void initChannel(NioServerSocketChannel ch) {
                        System.out.println("服务端启动过程中...");
                    }
                })
                .childHandler(new ChannelInitializer<NioSocketChannel>() { // childHandler()方法,给这个引导类创建一个ChannelInitializer,主要是定义后续每个连接的数据读写
                    protected void initChannel(NioSocketChannel ch) { // 泛型参数NioSocketChannel,这个类就是Netty对NIO类型连接的抽象
                        /*ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                System.out.println(msg);
                            }
                        });*/
//                        ch.pipeline().addLast(new FirstServerHandler());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        /*
         * 要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO模型、连接读写处理逻辑。
         * 有了这三者,之后再调用bind(8000),就可以在本地绑定一个8000端口启动服务端。
         */
//                .bind(8000);

        // 给这个ChannelFuture添加一个监听器GenericFutureListener
        serverBootstrap.bind(PORT).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println(new Date() + ": 端口[" + PORT + "]绑定成功!");
            } else {
                System.err.println("端口[" + PORT + "]绑定失败!");
            }
        });
    }

}

  

ServerHandler

import com.xc.xcspringboot.x_netty.protocol.Packet;
import com.xc.xcspringboot.x_netty.protocol.PacketCodec;
import com.xc.xcspringboot.x_netty.protocol.request.LoginRequestPacket;
import com.xc.xcspringboot.x_netty.protocol.request.MessageRequestPacket;
import com.xc.xcspringboot.x_netty.protocol.response.LoginResponsePacket;
import com.xc.xcspringboot.x_netty.protocol.response.MessageResponsePacket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    // 当从 Channel 读取数据时被调用
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf requestByteBuf = (ByteBuf) msg;
        Packet packet = PacketCodec.INSTANCE.decode(requestByteBuf); // 解码
        ByteBuf buffer = ctx.alloc().buffer();
        if (packet instanceof LoginRequestPacket) { // 处理登录
            LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;
            LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
            loginResponsePacket.setVersion(packet.getVersion());
            //登录校验
            if (valid(loginRequestPacket)) {
                //校验成功
                loginResponsePacket.setSuccess(true);
            } else {
                //校验失败
                loginResponsePacket.setReason("账号密码校验失败");
                loginResponsePacket.setSuccess(false);
            }
            //编码
            PacketCodec.INSTANCE.encode(buffer, loginResponsePacket);
            ctx.channel().writeAndFlush(buffer);
        } else if (packet instanceof MessageRequestPacket) { //处理消息
            MessageRequestPacket messageRequestPacket = ((MessageRequestPacket) packet);
            System.out.println(new Date() + ":收到客户端消息: " + messageRequestPacket.getMessage());
            MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
            messageResponsePacket.setMessage("服务端回复[" + messageRequestPacket.getMessage() + "] ");
            PacketCodec.INSTANCE.encode(buffer, messageResponsePacket);
            ctx.channel().writeAndFlush(buffer);
        }
    }


    private boolean valid(LoginRequestPacket loginRequestPacket) {
        return true;
    }

}

  

 

标签:Netty,netty,demo,xcspringboot,服务端,io,new,import,channel
From: https://www.cnblogs.com/ooo0/p/18205381

相关文章

  • netty 最简demo
    Netty是什么Netty到底是何方神圣?用一句简单的话来说就是:Netty封装了JDK的NIO,让你用得更方便,不用再写一大堆复杂的代码了。用官方正式的话来说就是:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务端和客户端。使用Netty而不使用JDK原生NIO的原因1.使用J......
  • qt之点的绘制示例demo
    #include"mainwindow.h"#include"ui_mainwindow.h"#include<QPainter>#include<QColor>QColorm_color;intm_x=0;intm_y=0;intm_w=0;intm_h=0;MainWindow::MainWindow(QWidget*parent):QMainWindow(parent)......
  • ai调用demo
    fromflaskimportFlask,request,jsonifyfromflask_docsimportApiDocfromopenaiimportAzureOpenAIimportosfromflask_docsimportApiDocfromflask_siwadocimportSiwaDocfromflasggerimportSwaggerapp=Flask(__name__)siwa=SiwaDoc(app)ApiDo......
  • Mysql - 数据库时区是客户端属性还是服务端属性
    一、说明同事问我数据库的时区是客户端属性还是服务端属性,我觉得这个问题十分有意思,之前没怎么留意,自己来做下实验。首先介绍几个术语。GMT(GreenwichMeanTime),格林尼治平均时间。UTC(CoordinatedUniversalTime),协调世界时。CST(ChinaStandardTime),中国标准时间,也称北京时间......
  • java netty 实现 websocket 服务端和客户端双向通信 实现心跳和断线重连 完整示例
    javanetty实现websocket服务端和客户端双向通信实现心跳和断线重连完整示例maven依赖<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.97.Final</version></dependency>服务端一个接口IGet......
  • 上百页html生成pdf解决方案(bookjs-easy)简洁完整版(包含接收服务端返回路径参数)
    依靠1:客户端插件 bookjs-easy(点击直接跳转官网)2:服务端插件screenshot-api-server实测105页的pdf,生成耗时40s左右,文件大小16MB项目需求:生成一个上百页的pdf,这个pdf包含表格、折线图、图片等,且横竖幅页面交叉 bookjs-easy官网的文档对于第一次看的人来说并不友好(建议第......
  • ModbusTCP从站(服务端)扫描工具 python实现
    扫描指定IP网络下,有哪些modbusTCP服务端[1-247]frompymodbus.clientimportModbusTcpClientfrompymodbus.exceptionsimportModbusIOException,ConnectionException,NoSuchSlaveExceptionimporttimedefread_holding_registers(client,slave_address):""&quo......
  • 网站应用微信登录 DEMO
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>网站应用微信登录DEMO</title&g......
  • 网站应用微信登录 DEMO
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>网站应用微信登录DEMO</title&g......
  • HTTP请求使用http、socks代理demo,包含有认证和无认证
    packagecn.daenx.myadmin.email.utils;importcn.hutool.http.HttpRequest;importcn.hutool.http.HttpUtil;importjava.net.*;/***HTTP请求使用http、socks代理demo,包含有认证和无认证**@authorDaenMax*/publicclassHttpProxyReqDemo{publicstatic......