首页 > 编程语言 >java使用Netty实现TCP收发消息的例子,多线程并且含断线自动重连

java使用Netty实现TCP收发消息的例子,多线程并且含断线自动重连

时间:2024-07-04 15:56:20浏览次数:1  
标签:Netty 重连 group netty bootstrap io import 多线程 public

需求:有一个TCP的服务,需要使用Netty开发一个TCP连接并收发消息的程序。要求 多线程并且含断线自动重连 能力。

组织结构,使用 Java Maven 编程方式

功能还包含 读取配置文件 和 log4j2写日志 部分

 完整代码:

App.java

package com.LSpbxServer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ResourceBundle;

/**  pbxServer  */
public class App implements INettyClientEventListener
{
    protected static final Logger logger = LoggerFactory.getLogger(App.class);

    public static void main( String[] args )
    {
        //System.out.println( "Hello World!" );
        logger.info("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"  );
        logger.info("@@@@@@@@@@@@@@@@@@@ 启动 pbxServer @@@@@@@@@@@@@@@@@@@@@@@@@@@@@"  );
        logger.info("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"  );
        //读取参数
        ResourceBundle resource = ResourceBundle.getBundle("config");
        String strPbxIp = resource.getString("pbxIp");//
        String strPbxPort = resource.getString("pbxPort");
        String strApiPwd = resource.getString("apiPwd");
        //System.out.println( "pbx "+strPbxIp+" " + strPbxPort +" "+ strApiPwd +"---------- ");
        logger.info("pbx 参数: " + strPbxIp + " " + strPbxPort + " " + strApiPwd + "  ---------- ");

        try {
            new NettyClient().connect();

        }catch (Exception ex) {
            System.out.println("Error1 " + ex.toString());
        }
    }

    //pbx收到消息事件----------------------------------------------
    public void NettyClientEvent_RecMessage(String message) {
        // 执行你希望执行的逻辑
        System.out.println("Received message: " + message);
    }

}
NettyClient.java
package com.LSpbxServer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


public class NettyClient {
    private static final String HOST = "192.168.1.150"; // 服务器地址
    private static final int PORT = 8444; // 服务器端口

    public void connect() {
        // 创建主从线程组
        final EventLoopGroup group = new NioEventLoopGroup();
        // 创建客户端引导类
        final Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class) // 使用NIO传输
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 配置通道流水线
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder()); // 添加字符串解码器
                            pipeline.addLast(new StringEncoder()); // 添加字符串编码器
                            pipeline.addLast(new NettyClientHandler(bootstrap, group)); // 添加客户端处理器

                        }
                    });
            // 发起连接
            ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
            // 等待连接关闭
            future.channel().closeFuture().sync();

        } catch (Exception ex) {
            try {
                System.out.println("连接pbx失败 NettyClient().connect() 20秒后 再次连接 =========================");

                for (int i = 1; i <= 20; i++) {
                    System.out.println("连接pbx失败 NettyClient().connect()  线程 " + Thread.currentThread().getName() + "   " + i);
                    // 线程休眠1秒
                    Thread.sleep(1000);
                }
                new NettyClient().connect();
            } catch (Exception ex1) {
                System.out.println("再次尝试连接出错,退出程序=========================");
            }

        } finally {
            // 优雅地退出,释放线程池资源
            System.out.println("优雅地退出,释放线程池资源=========");
            group.shutdownGracefully();
            //throw new Exception("这是一个检查型异常");
        }

    }

    public void reRunClient() {
        try {
            System.out.println(" pbx连接已断开 " + " 5秒后 再次连接pbx ");
            // 线程休眠5秒
            Thread.sleep(5000);
            System.out.println(" " + " 开始连接pbx--------- ");
            connect();//再次连接pbx

        } catch (Exception ex) {
            System.out.println("重新连接pbx时出现异常 " + ex.toString());
        }
    }

}

NettyClientHandler.java

package com.LSpbxServer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private final Bootstrap bootstrap;
    private final EventLoopGroup group;

    private INettyClientEventListener nettyClientEventListener;

    public NettyClientHandler(Bootstrap bootstrap,EventLoopGroup group ) {
        this.bootstrap = bootstrap;
        this.group=group;

        this.nettyClientEventListener = new App();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws InterruptedException {
        //当通道活跃时发送消息,第一次连接成功时 执行
        //ctx.writeAndFlush("Hello, Netty Server!");
        try {
            String strMsg = "Action: login\r\n Username: api\r\n Secret: F6jFt82g\r\n Version: 2.0.0\r\n \r\n\r\n";
            ctx.writeAndFlush(strMsg);//发消息
            System.out.println("开始连接pbx 发消息 " + strMsg);

        }catch (Exception ex){
            System.out.println( "NettyClientHandler channelActive() 连接时出错 "+ ex.toString() );
        }

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        //当通道不活跃时,发出连接断开时执行
        try {
            System.out.println("与pbx连接断开,20秒后再次尝试连接=========================");
            for (int i = 1; i <= 20; i++) {
                System.out.println("channelInactive() 线程 " + Thread.currentThread().getName() + "   " + i);
                // 线程休眠1秒
                Thread.sleep(1000);
            }
            new NettyClient().connect();

        } catch (Exception ex) {
            System.out.println("NettyClientHandler channelInactive() 与pbx连接断开,20秒后再次尝试连接时,发生异常 "+ ex.toString() );
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //收到pbx消息
        try {
            // 处理接收到的消息
            System.out.println("收到pbx消息: " + msg);
//触发回调函数
            nettyClientEventListener.NettyClientEvent_RecMessage(msg.toString());
        }catch (Exception ex){
            System.out.println( "NettyClientHandler channelRead() 收到pbx消息时,出现异常 "+ ex.toString() );
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //与pbx的连接出现异常,自动关闭连接
        try {
            System.out.println("与pbx的连接出现异常,自动关闭连接=========================");
            // 出现异常时关闭连接
            cause.printStackTrace();
            ctx.close();

        }catch (Exception ex){
            System.out.println( "NettyClientHandler channelRead() 收到pbx消息时,出现异常 "+ ex.toString() );
        }

    }
}

INettyClientEventListener.java

package com.LSpbxServer;

public interface INettyClientEventListener {
    void NettyClientEvent_RecMessage(String message);
}

config.properties

pbxIp=192.168.1.150
pbxPort=8000
apiPwd=123456

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<Configuration status="info" monitorInterval="60">
    <!--定义了两个常量方便后面复用 -->
    <properties>
        <!--生成的日志文件目录地址  -->
        <property name="LOG_HOME">logs/</property>
        <!--日志文件名称 -->
        <property name="FILE_NAME">LogFile</property>
    </properties>
    <!--先定义所有的appender-->
    <Appenders>
        <!-- 定义控制台输出 -->
        <Console name="Console" target="SYSTEM_OUT">
            <!--输出日志的格式-->
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %l - %msg%n" />
        </Console>
        <!--满足一定条件后,就重命名原日志文件用于备份,并从新生成一个新的日志文件 -->
        <!--fileName:指定当前日志文件的位置和文件名称   filePattern:指定当发生Rolling时,文件的转移和重命名规则-->
        <RollingFile name="uleWalletEjbLogFile"
                     fileName="${LOG_HOME}/${FILE_NAME}.log"
                     filePattern="${LOG_HOME}/$${date:yyyy-MM-dd}/${FILE_NAME}-%d{yyyy-MM-dd HH}-%i.log">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %l - %msg%n" />
            <Policies>
                <!--TimeBasedTriggeringPolicy这个配置需要和filePattern结合使用,
                注意filePattern中配置的文件重命名规则是${FILE_NAME}-%d{yyyy-MM-dd HH-mm}-%i,
                最小的时间粒度是mm,即分钟,TimeBasedTriggeringPolicy指定的size是1,结合起来就是每1分钟生成一个新文件。
                如果改成%d{yyyy-MM-dd HH},最小粒度为小时,则每一个小时生成一个文件。  -->
                <TimeBasedTriggeringPolicy interval="1" />
                <!--SizeBasedTriggeringPolicy  指定当文件体积大于size指定的值时,触发Rolling 2048 MB -->
                <SizeBasedTriggeringPolicy size="20 MB" />
            </Policies>
            <DefaultRolloverStrategy max="30" />
        </RollingFile>
    </Appenders>
    <Loggers>
        <Root level="DEBUG">
            <AppenderRef ref="Console" />
            <AppenderRef ref="uleWalletEjbLogFile" />
        </Root>
        <!-- 过滤掉 指定类的debug -->
        <Logger name="io.netty" level="info" additivity="false">
            <AppenderRef ref="Console" />
        </Logger>
        <Logger name="org.eclipse.jetty" level="info" additivity="false">
            <AppenderRef ref="Console" />
        </Logger>
        <!-- 收到坐席消息 -->
        <Logger name="com.joincall.j3c.agentservice.AbstractAgentConnector" level="info" additivity="false">
            <AppenderRef ref="Console" />
        </Logger>
        <!-- 每5秒向pbx请求队列状态 -->
        <Logger name="com.joincall.j3c.pbxlangshi.pbxaction" level="info" additivity="false">
            <AppenderRef ref="Console" />
            <AppenderRef ref="uleWalletEjbLogFile" />
        </Logger>

    </Loggers>
</Configuration>

pom.xml 主要代码

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.1.30.Final</version>
      </dependency>

    <!-- log4j2 -->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.3</version>
      <!-- <scope>provided</scope> -->
    </dependency>

    <!-- slf4j核心包 -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.2</version>
    </dependency>
    <!--用于与slf4j保持桥接-->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.3</version>
    </dependency>
  </dependencies>

配置文件路径

    </plugins>
 </pluginManagement>

    <resources>
      <resource>
        <filtering>true</filtering>
        <directory>src/main/resources</directory>
        <includes>
          <include>**/**.properties</include>
          <include>**/**.xml</include>
        </includes>
        <!-- <targetPath>/resources</targetPath> -->
      </resource>
    </resources>

  </build>
</project>

 

标签:Netty,重连,group,netty,bootstrap,io,import,多线程,public
From: https://www.cnblogs.com/hailexuexi/p/18283984

相关文章

  • 多线程编程的基本概念,C++标准库中的多线程支持(std::thread,std::async),如何处理线程同步
    多线程编程在现代计算机系统中非常重要,因为它能够使程序同时执行多个操作,提高计算效率。以下是多线程编程的基本概念及如何在C++标准库中使用std::thread和std::async进行多线程编程,同时处理线程同步和并发问题。多线程编程的基本概念线程(Thread):线程是一个轻量级的进程,是......
  • JAVA多线程快速入门
    什么是多线程概述线程线程是操作系统能够进行运算调度的最小单位它被包含在进程之中,是进程中的实际运作单位简单理解应用软件中互相独立,可以同时运行的功能进程进程是程序的基本执行实体/系统分配资源的基本单位作用充分利用cpu提......
  • Java多线程编程
    1.进程进程是指操作系统中正在运行的程序实例,它是系统资源分配的基本单位。每个进程都拥有独立的内存空间和系统资源,可以看作是程序的一次执行过程。2.线程线程是进程中的执行单元,也被称为轻量级进程(LightWeightProcess)。一个进程可以包含多个线程,这些线程共享进......
  • springboot项目国产化适配,jar改war包碰到的坑-tomcat版本要适配(非法访问:此Web应用程序
    项目原来是jar包运行,国产化适配要改成war包。可以参考https://blog.csdn.net/NAMELZX/article/details/138123405或者其他jar 改成 war 的文章。改成war后,在本地tomcat8上运行,一直报org.apache.catalina.loader.WebappClassLoaderBase.checkStateForResourceLoading非法......
  • Winform SynchronizationContext多线程更新画面控件
    SynchronizationContext在通讯中充当传输者的角色,实现功能就是一个线程和另外一个线程的通讯。需要注意的是,不是每个线程都附加SynchronizationContext这个对象,只有UI线程是一直拥有的。故获取SynchronizationContext也只能在UI线程上进行SynchronizationContextcontex......
  • Linux多进程和多线程(一)-进程的概念和创建
    进程进程的概念进程的特点如下进程和程序的区别LINUX进程管理getpid()getppid()进程的地址空间虚拟地址和物理地址进程状态管理进程相关命令pstoppstreekill进程的创建并发和并行fork()父子进程执行不同的任务创建多个进程进程的退出exit()和_exit()exit()函数......
  • 深入理解Qt多线程编程(QtConcurrent)
    多线程编程在现代软件开发中变得越来越重要,它能够提高应用程序的响应速度和处理性能。在Qt框架中,除了QThreadPool,QtConcurrent也是一个强大的工具,用于简化和管理多线程编程。目录概述接口详解QtConcurrent::runQtConcurrent::mapQtConcurrent::mappedQtConcurrent::filt......
  • Java_多线程:实现多线程
    Java中实现多线程的常用方式:继承Thread类实现Runnable接口实现Callable接口(JDK>=1.5)线程池方式创建实现Runnable接口与Callable接口的区别实现Runnable接口和Callable接口的方式基本相同,不过Callable接口里定义的方法返回值,可以声明抛出异常。Runnable和Callable与......
  • java多线程-锁的介绍
    多线程中常用锁一、锁的概念二、锁的类型2.1互斥锁(也称排它锁)2.1.1Synchronized和Lock2.1.2ReentrantLock(可重入锁)2.1.3公平锁2.1.4非公平锁2.1.5中断锁2.2共享锁2.3读写锁三、悲观锁和乐观锁3.1悲观锁3.2乐观锁3.3CAS算法四、锁竞争一、锁的概念在多......
  • 在多线程并发操作中处理大量文件时,以下是一些关键的底层原理和技术:
    在多线程并发操作中处理大量文件时,以下是一些关键的底层原理和技术:1.文件句柄管理每个线程需要独立地管理文件句柄,文件句柄是操作系统提供的用于标识和访问文件的资源。在Windows环境下,使用CreateFile函数可以打开文件并获得文件句柄。每个文件句柄具有其自己的上下文和状态,......