首页 > 其他分享 >SpringBoot Netty socket使用

SpringBoot Netty socket使用

时间:2023-08-07 13:01:26浏览次数:45  
标签:Netty SpringBoot netty io new import public channel socket

SpringBoot Netty socket使用

Netty是由JBOSS提供的一个java开源框架,现为Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

这里springBoot整合起来使用测试,性能怎么的不怎么了解,至少能用

maven引用依赖

<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.42.Final</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>

配置

这里测试看看启动两个socket server,所以两个端口

nettysocket:
    port:  32768
    port2: 32769

Netty 服务

MySoketListener

package com.ld.test.socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

@Component
//@WebListener
public class MySoketListener implements ServletContextListener {
    private static final Logger log = LoggerFactory.getLogger(MySoketListener.class);

    @Value("${nettysocket.port}")
    private Integer port;

    @Value("${nettysocket.port2}")
    private Integer port2;

    private MyTestNettyServer myTestNettyServer;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        log.info("1.启动时,开启监听========================");
        if (myTestNettyServer == null) {
            log.info("2.启动时,MyTestNettyServer为null,启动Netty socket服务========================");

            log.info("=====MyTestNettyServer 端口为:" + port);           

            Thread thread = new Thread(new MyTestNettyServer(port));
            thread.start();

            //启动了别一个服务后 也是能共用ServerHandler.ChannelGroup进行统一对多个端进行转发消息
            //这里注意:不能使用在MyTestNettyServer 使用static 静态端口号,会被覆盖的
            //可以再启动一个服务
            log.info("=====MyTestNettyServer 端口为:" + port2);
            Thread thread2 = new Thread(new MyTestNettyServer(port2));
            thread2.start();

        }
    }

    // 应用关闭时,此方法被调用
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        log.info("23========================");
    }

}

MyTestNettyServer

package com.ld.test.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyTestNettyServer implements Runnable {
    /**
     * 异常输出
     */
    private static final Logger log = LoggerFactory.getLogger(MyTestNettyServer.class);
    private static Integer DEFAULT_PORT = 58765;
    private Integer port = DEFAULT_PORT;
    private String serverName = "";


    //默认
    public MyTestNettyServer() {
        port = DEFAULT_PORT;
        serverName = "srv(" + port + ")";
    }

    public MyTestNettyServer(Integer port) {
        this.port = port;
        serverName = "srv(" + this.port + ")";
    }

    /**
     * soket监听
     */
    // public static void soketListener() {
    public void soketListener() {
        log.info(serverName + "当前SOCKET NettyServer 端口为: port=" + port);
        Integer portNow = port;
        EventLoopGroup bossExecutors = new NioEventLoopGroup();
        EventLoopGroup workExecutors = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossExecutors, workExecutors)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 65535, 65535))
                    .childHandler(new ServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(portNow).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("NettyServer 端口为:" + portNow + " 启动出现异常, 异常内容为:" + e.getMessage() + "========================");

        } finally {
            log.error("NettyServer 服务关闭========================");
            bossExecutors.shutdownGracefully();
            workExecutors.shutdownGracefully();
        }

    }

    /**
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        log.info("多线程启动Netty sserver========================");
        // CallNettyServer.soketListener();
        this.soketListener();
    }


}

ServerHandler

package com.ld.test.socket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


public class ServerHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 日志输出
     */
    private static final Logger log = LoggerFactory.getLogger(MyTestNettyServer.class);
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    // private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    // private ConcurrentMap<String,T> socketConnectionMap = new ConcurrentHashMap<>();

    //存放 key=channel 及value=DF
    private static ConcurrentMap<String, String> channelMap = new ConcurrentHashMap<>();


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println("====");
        System.out.println("channelMap客户端:" + JSON.toJSONString(channelMap));

        System.out.println("====");
        System.out.println(msg);
        System.out.println("====");


    }

    //给http 控制器调用的 转发消息的方法,通过 channelGroup缓存的Channel进行转发
    public static void sendMessage(String msg) throws Exception {
        JSONObject retJson = new JSONObject();
        retJson.put("code", "9999");
        retJson.put("message", "-");
        try {
            System.out.println("===================");
            System.out.println("socet sendMessage收到报文");
            System.out.println(msg);
            System.out.println("===================");

        } catch (Exception e) {
            System.out.println("转换信息出错,收到内容:" + msg);
            e.printStackTrace();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "channelActive");

        //统一给所有客户端发送一个上线通知
        //  channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线channelActive"+"\n");

        channelGroup.add(channel);
        System.out.println("当前socket连接数 channelGroup.size():" + channelGroup.size());

        //
        // channelMap.put(channel.id().asLongText(),"13001");

    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "-----channelInactive");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "handlerAdded");
        // channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线"+"\n");
        channelGroup.add(channel);
        //channelMap.put(channel.id().asLongText(),"1001");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();

        System.out.println(channel.remoteAddress() + "handlerRemoved");

        channelMap.remove(channel.id().asLongText());

        // channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经下线"+"\n");
        System.out.println("当前socket连接数channelGroup.size():" + channelGroup.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


ServerInitializer

package com.ld.test.socket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ServerHandler());
    }
}

SpringBoot服务

package com.ld.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@SpringBootApplication
@ServletComponentScan
public class NettySocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(NettySocketApplication.class, args);
    }
}

启动应用

SpringBoot Netty socket使用_SpringBoot

测试

package com.ld.test.test;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.util.UUID;

@Slf4j
public class SocketClient1 {
    private static Logger log = LoggerFactory.getLogger(SocketClient1.class);

    public static void main(String[] args) throws IOException {
        // String host = "192.168.2.156";
        // int port = 8068;

        String host = "127.0.0.1";
        int port = 32768;


        //与服务端建立连接
        Socket socket = new Socket(host, port);
        socket.setOOBInline(true);

        //建立连接后获取输出流
        DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
        OutputStreamWriter outSW = new OutputStreamWriter(outputStream, "UTF-8");
        BufferedWriter bw = new BufferedWriter(outSW);


        DataInputStream inputStream = new DataInputStream(socket.getInputStream());
        InputStreamReader inSR = new InputStreamReader(inputStream, "UTF-8");
        BufferedReader br = new BufferedReader(inSR);


        String uuid = UUID.randomUUID().toString();
        uuid = "用户1" + "\r\n";
        log.info("uuid: {}", uuid);
        outputStream.write(uuid.getBytes());
        while (true) {
            //接收消息循环
        }

    }
}

SpringBoot Netty socket使用_socket_02

SpringBoot Netty socket使用_SpringBoot_03

标签:Netty,SpringBoot,netty,io,new,import,public,channel,socket
From: https://blog.51cto.com/u_12668715/6992717

相关文章

  • Mitsubishi 三菱FX5U间socket通讯
    Socket通信Socket通信是指应用程序利用Socket接口实现的网络通信,用于在不同的计算机之间传输数据。Socket是一种标准的API(应用程序接口),可用于在不同的计算机之间传输数据,它支持TCP、UDP等多种网络协议。在Socket通信中,客户端和服务器通过Socket接口进行通信。客户端创建......
  • Docker部署SpringBoot项目微服务
     登录linux服务器,进入目录  usr/local目录,创建两个文件夹使用alt+p快捷键,将windows目录中生成的jar包,拖到linux桌面使用cp命令,将linux桌面中jar包,复制到指定目录cplesson01-0.0.1-SNAPSHOT.jar/usr/local/docker/lesson01使用vi命令,编辑dockerfile,制作镜......
  • Docker部署SpringBoot项目准备
     创建一个SpringBoot项目测试程序,在Windows中执行java  -jar  xxx.jar查看效果java-jarlesson01-0.0.1-SNAPSHOT.jar主要代码1、pom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0&q......
  • Springboot - mybatis
    目录入门入门1.准备工作(创建springboot工程,数据库表user,实体类user)2.引入Mybatis的相关依赖,配置Mybatis3.编写SQL语句(注解/xml)创建工程时直接引入mybatis依赖:dao层编写mapper:packagecom.chuangzhou.dao;importcom.chuangzhou.pojo.User;importorg.apache.ibati......
  • 一种SpringBoot下Scheduler定时任务优雅退出方案
    背景近期业务中有一个定时任务发现每次服务部署时,偶发性的会触发问题,这里记录一下问题的跟进解决。分析现象该定时任务每2分钟执行一次,完成数据的更新处理。同时服务部署了多个服务器节点,为保证每次只有一个服务器节点上的任务在跑,引入了基于Redis缓存的分布式锁。示例源码@Schedu......
  • 【springBoot】搭建多模块项目指南
    二、多环境配置内容①在reources目录下建立多个配置文件②在pom.xml配置profile节点③mvn命令行打包命令,基于参数进行打包替换占位符mvn-Ucleaninstall-Dmaven.test.skip=true-Pprod④将target目录下的jar包copy出来,执行命令启动,使用的就是不同环境的配置进行启动的......
  • 基于springboot城市便捷酒店管理系统
    随着科技不断的进步,系统管理也都将通过计算机进行整体智能化操作。对于酒店预订网站所牵扯的管理及数据保存都是非常多的,例如管理员:用户管理、客房管理、栏目管理、内容管理、轮播图管理、订单管理、数据统计等,这给管理者的工作带来了巨大的挑战,面对大量的信息,传统的管理系统,都是通......
  • 记录一下 搭建springboot,springCloud,springCloudAlibaba,nacos
    1,首先创建一个空项目里面有两个服务一个提供者一个调用者 2,父工程的使用依赖 以及springBoot的父依赖//springboot父工程<parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId>......
  • SpringBoot对接OpenAI
    SpringBoot对接OpenAI随着人工智能技术的飞速发展,越来越多的开发者希望将智能功能集成到自己的应用中,以提升用户体验和应用的功能。OpenAI作为一家领先的人工智能公司,提供了许多先进的自然语言处理和语言生成模型,其中包括深受欢迎的GPT-3.5模型。本文将介绍如何利用SpringBoot框......
  • SpringBoot - IOC&DI
    目录三层架构三层架构controller:控制层,接收前端请求,对请求进行处理,并响应数据service:业务逻辑层,处理具体的业务逻辑dao:数据访问层(DataAccessObject)(持久层),负责访问操作,包括数据得增删改查员工案例重构:controller:packagecom.chuangzhou.controller;importcom.chu......