首页 > 其他分享 >基于SpringBoot+Netty实现即时通讯(IM)功能

基于SpringBoot+Netty实现即时通讯(IM)功能

时间:2023-10-18 15:26:37浏览次数:43  
标签:Netty SpringBoot netty ctx IM io import new channel

简单记录一下实现的整体框架,具体细节在实际生产中再细化就可以了。

第一步 引入netty依赖

SpringBoot的其他必要的依赖像Mybatis、Lombok这些都是老生常谈了 就不在这里放了

       <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.85.Final</version>
        </dependency>

 

第二步 接下来就是准备工作。

消息服务类(核心代码) 聊天服务的功能就是靠这个类的start()函数来启动的 绑定端口8087 之后可以通socket协议访问这个端口来执行通讯

import com.bxt.demo.im.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Description: 即时通讯服务类
 * @author: bhw
 * @date: 2023年09月27日 13:44
 */
@Slf4j
public class IMServer {
  // 用来存放连入服务器的用户集合 public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024);   // 用来存放创建的群聊连接 public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void start() throws InterruptedException { log.info("IM服务开始启动"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); // 绑定端口 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 添加http编码解码器 pipeline.addLast(new HttpServerCodec()) //支持大数据流 .addLast(new ChunkedWriteHandler()) // 对http消息做聚合操作 FullHttpRequest FullHttpResponse .addLast(new HttpObjectAggregator(1024*64)) //支持websocket .addLast(new WebSocketServerProtocolHandler("/")) .addLast(new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(8087).sync(); log.info("服务器启动开始监听端口: {}", 8087); future.channel().closeFuture().sync(); //关闭主线程组 bossGroup.shutdownGracefully(); //关闭工作线程组 workGroup.shutdownGracefully(); } }

 

 创建聊天消息实体类

/**
 * @Description: 聊天消息对象 可以自行根据实际业务扩展
 * @author: seizedays
 */
@Data
public class ChatMessage extends IMCommand {
    //消息类型
    private Integer type;
    //消息目标对象
    private String target;
    //消息内容
    private String content;

}

连接类型枚举类,暂时定义为建立连接、发送消息和加入群组三种状态码

@AllArgsConstructor
@Getter
public enum CommandType {

    //建立连接
    CONNECT(10001),
    //发送消息
    CHAT(10002),
    //加入群聊
    JOIN_GROUP(10003),
    ERROR(-1)
    ;


    private Integer code;

    public static CommandType match(Integer code){
        for (CommandType value : CommandType.values()) {
            if (value.code.equals(code)){
                return value;
            }
        }
        return ERROR;
    }

}

命令动作为聊天的时候 消息类型又划分为私聊和群聊两种 枚举类如下:

@AllArgsConstructor
@Getter
public enum MessageType {

    //私聊
    PRIVATE(1),
    //群聊
    GROUP(2),
    ERROR(-1)
    ;
    private Integer type;

    public static MessageType match(Integer code){
        for (MessageType value : MessageType.values()) {
            if (value.type.equals(code)){
                return value;
            }
        }
        return ERROR;
    }

}

 

创建连接请求的拦截器

import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.IMCommand;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * @Description: 用户连接到服务端的拦截器
 * @author: bhw
 * @date: 2023年09月27日 14:28
 */
public class ConnectionHandler {
    public static void execute(ChannelHandlerContext ctx, IMCommand command) {
        if (IMServer.USERS.containsKey(command.getNickName())) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已经在线,不能重复连接"))));
            ctx.channel().disconnect();
            return;
        }

        IMServer.USERS.put(command.getNickName(), ctx.channel());

        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系统消息:" + command.getNickName() + "与服务端连接成功"))));

        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet())))));
    }
}

加入群组功能的拦截器

/**
 * @Description: 加入群聊拦截器
 * @author: bhw
 * @date: 2023年09月27日 15:07
 */
public class JoinGroupHandler {
    public static void execute(ChannelHandlerContext ctx) {
        try {
            IMServer.GROUP.add(ctx.channel());
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系统默认群组成功!"))));
        } catch (Exception e) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
        }

    }
}

发送聊天到指定对象的功能拦截器

import com.alibaba.excel.util.StringUtils;
import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.ChatMessage;
import com.bxt.demo.im.cmd.MessageType;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Objects;

/**
 * @Description: 聊天拦截器
 * @author: bhw
 * @date: 2023年09月27日 15:07
 */
public class ChatHandler {
    public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        try {
            ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class);
            MessageType msgType = MessageType.match(message.getType());

            if (msgType.equals(MessageType.PRIVATE)) {
                if (StringUtils.isBlank(message.getTarget())){
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,请选择消息发送对象"))));
                    return;
                }
                Channel channel = IMServer.USERS.get(message.getTarget());
                if (Objects.isNull(channel) || !channel.isActive()){
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,对方不在线"))));
                    IMServer.USERS.remove(message.getTarget());
                    return;
                }
                channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊消息(" + message.getTarget() + "):" + message.getContent()))));

            } else if (msgType.equals(MessageType.GROUP)) {
                IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群消息:发送者(" + message.getNickName() + "):" + message.getContent()))));
            }else {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:不支持的消息类型"))));
            }


        } catch (Exception e) {
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
        }

    }
}

最后是websocket拦截器 接收到客户端的指令后选择对应的拦截器实现相应的功能:

import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.CommandType;
import com.bxt.demo.im.cmd.IMCommand;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

/**
 * @Description: websocket拦截器
 * @author: bhw
 * @date: 2023年09月27日 13:59
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        System.out.println(frame.text());
        try {
            IMCommand command = JSON.parseObject(frame.text(), IMCommand.class);
            CommandType cmdType = CommandType.match(command.getCode());
            if (cmdType.equals(CommandType.CONNECT)){
                ConnectionHandler.execute(ctx, command);
            } else if (cmdType.equals(CommandType.CHAT)) {
                ChatHandler.execute(ctx,frame);
            } else if (cmdType.equals(CommandType.JOIN_GROUP)) {
                JoinGroupHandler.execute(ctx);
            } else {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支持的code"))));
            }
        }catch (Exception e){
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage()))));
        }

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 当连接断开时被调用
        Channel channel = ctx.channel();
        // 从 USERS Map 中移除对应的 Channel
        removeUser(channel);
        super.channelInactive(ctx);
    }

    private void removeUser(Channel channel) {
        // 遍历 USERS Map,找到并移除对应的 Channel
        IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel);
    }
}

第三步 启动服务

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
       // 启动IM服务
        try {
            IMServer.start();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

现在 客户端通过socket协议访问8087端口即可实现基本的聊天室功能了!

标签:Netty,SpringBoot,netty,ctx,IM,io,import,new,channel
From: https://www.cnblogs.com/seizedays/p/17772433.html

相关文章

  • Lock wait timeout exceeded; try restarting transaction临时解决办法
    错误日志Errorupdatingdatabase.Cause:com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException:Lockwaittimeoutexceeded;tryrestartingtransaction###Theerrormayinvolvecom.zhonghe.userim.dao.mysql.CrmRoleDao.updateById-Inline###SQL:UPDATE......
  • 2023.10.13NOIPSIM3总结
    T1卡牌赛时打了一个\(\Omicron(nm)\)的暴力,拿到30分。我们发现第\(i\)张牌对BOSS造成的伤害为$att_i*\lceil\frac{hp_i}{Att}\rceil$,那么考虑以卡牌血量值域为下标开一个桶,储存相同血量的卡牌的\(\sumatt\)。对于每一级BOSS的攻击力,我们都可以在桶上根据\(\lceil......
  • [914] In Python's datetime library, you can format dates using the strftime() me
    InPython'sdatetimelibrary,youcanformatdatesusingthestrftime()method.Thismethodallowsyoutocreateaformattedstringrepresentationofadatetimeobject,specifyingtheformatyouwant.Here'showyoucanformatadateusingstrft......
  • TDengine 资深研发整理:基于 SpringBoot 多语言实现 API 返回消息国际化
    作为一款在Java开发社区中广受欢迎的技术框架,SpringBoot在开发者和企业的具体实践中应用广泛。具体来说,它是一个用于构建基于Java的Web应用程序和微服务的框架,通过简化开发流程、提供约定大于配置的原则以及集成大量常用库和组件,SpringBoot能够帮助开发者更快速、更高效地......
  • SpringBoot限制接口访问频率
    功能说明可以通过注解快速实现类似一段时间内仅可以搜索N次,或一段时间内只能点赞N次,以及一段时间内尝试登陆多次即被禁止一小时等类似功能。github地址中文文档第一步:添加Maven依赖<dependency><groupId>io.github.liuye744</groupId><artifactId>simpleAuth-sprin......
  • 基于 SpringBoot+Hikvision SDK 远程查看配置海康网络摄像头配置
    写在前面工作中遇到,简单整理理解不足小伙伴帮忙指正对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧——赫尔曼·黑塞《德米安》海康设备通过SDK查看......
  • Angular Routing Primary
    WhyRouting1.通过Url表示应用的状态2.让应用变得可以link,即直接通过url来链接到某个指定的页面3.可用通过用户角色来确定是否可以访问该页面4.*可通过路由来模块化加载应用*ConfiguringRoutesAutocreate:可以通过angular-cli来创建基本的routing相关Ma......
  • linear optimization
    \(\begin{array}{ll}\operatorname{minimize}&f_0(x)\\\text{subjectto}&f_i(x)\leqb_i,\quadi=1,\ldots,m.\end{array}\)Asanimportantexample,theoptimizationproblem(1.1)iscalledalinearprogramiftheobjectiveandc......
  • SpringBoot 缓存之 @Cacheable 详细介绍
    简介缓存介绍Spring从3.1开始就引入了对Cache的支持。定义了org.springframework.cache.Cache和org.springframework.cache.CacheManager接口来统一不同的缓存技术。并支持使用JCache(JSR-107)注解简化我们的开发。其使用方法和原理都类似于Spring对事务管理的支持。Sp......
  • springboot连接rabbitmq网络异常时,不能自动重连的问题
    在rabbitmq官网的javaapi处:https://www.rabbitmq.com/api-guide.html#recovery介绍了java连接rabbitmq异常恢复的问题。其中提及了如下内容Asofversion4.0.0oftheJavaclient,automaticrecoveryisenabledbydefault(andthustopologyrecoveryaswell).也就是......