首页 > 其他分享 >SignalR实时消息推送

SignalR实时消息推送

时间:2024-09-10 10:06:24浏览次数:1  
标签:userId 实时 SignalR token var new 推送 public

后端

创建一个Hub类,重写连接和断连方法

ChatHub

记录下每一位登录人连接SignalR的信息至Redis中

引用的Microsoft.AspNetCore.App框架

public class ChatHub : Hub
{
    //ConcurrentDictionary<string, HubUser> concurrentDictionary = new ConcurrentDictionary<string, HubUser>();
    private ICurrentLoginInfoService _currentLoginInfoService;
    private IUserSignalRRedisService  _userSignalRRedisService;

    public ChatHub(ICurrentLoginInfoService currentLoginInfoService, IUserSignalRRedisService userSignalRRedisService)
    {
        _currentLoginInfoService = currentLoginInfoService;
        _userSignalRRedisService = userSignalRRedisService;
    }



    /// <summary>
    /// 连接
    /// </summary>
    /// <returns></returns>
    public override async Task OnConnectedAsync()
    {
        //var client = concurrentDictionary.TryGetValue(Context?.ConnectionId, out HubUser? hubUser);
        var client = await _userSignalRRedisService.GetUserConnection(_currentLoginInfoService.UserId);
        if (client == null)
        {
            client = new HubUser()
            {
                ConnectionID = Context.ConnectionId,
                UserId = _currentLoginInfoService.UserId,
                Name = _currentLoginInfoService.UserName
            };
            //concurrentDictionary.GetOrAdd(newUser.ConnectionID, newUser);
        }
        else
        {
            client.ConnectionID = Context.ConnectionId;
        }
        await _userSignalRRedisService.SetUserConnection(_currentLoginInfoService.UserId, client);
        await base.OnConnectedAsync();
    }

    /// <summary>
    /// 断开连接
    /// </summary>
    /// <param name="exception"></param>
    /// <returns></returns>
    public override async Task OnDisconnectedAsync(Exception? exception)
    {
        //var client = concurrentDictionary.TryGetValue(Context.ConnectionId, out HubUser? hubUser);
        var hubUser = await _userSignalRRedisService.GetUserConnection(_currentLoginInfoService.UserId);
        if (hubUser != null)
        {
           await  _userSignalRRedisService.RemoveUserConnection(_currentLoginInfoService.UserId);
            //concurrentDictionary.Remove(hubUser.ConnectionID, out HubUser? reVal);
        }

        await base.OnDisconnectedAsync(exception);
    }

   
}

UserSignalRRedisService

 public class UserSignalRRedisService : IUserSignalRRedisService
 {
     private IRedisServer _redisServer;

     public UserSignalRRedisService(IRedisServer redisServer)
     {
         _redisServer = redisServer;
     }
     /// <summary>
     /// 获取用户的SignalR信息
     /// </summary>
     /// <returns></returns>
     public async Task<HubUser> GetUserConnection(long userId)
     {
         var key = string.Format(GlobalConstants.UserSignalRSession, userId);
         var obj = await _redisServer.GetAsync(key);
         return obj.ToObject<HubUser>();
     }

     /// <summary>
     /// 设置用户的SignalR信息
     /// </summary>
     /// <returns></returns>
     public async Task<bool> SetUserConnection(long userId, HubUser hubUser)
     {
         var key = string.Format(GlobalConstants.UserSignalRSession, userId);
         return await _redisServer.SetAsync(key, hubUser.ToJson());
     }

     /// <summary>
     /// 删除用户的SignalR信息
     /// </summary>
     /// <param name="userId"></param>
     /// <returns></returns>
     public async Task<bool> RemoveUserConnection(long userId)
     {
         return await _redisServer.RemoveAsync(userId.ToString());
     }

 }

ICurrentLoginInfoService

当前登录人信息

 Program.cs引入SignalR

app.MapHub<ChatHub>("/messageHub");

自定义一个中间件来拦截SignalR的请求,做当前登录人信息做处理

public class SignalrInterceptionMiddleware
{
    public const string AuthorizationSchem = "Bearer";
    private readonly RequestDelegate _next;
    private readonly IUserRedisService _redisService;
    private readonly JwtOption _jwtOption;
    private readonly ILogger<SignalrInterceptionMiddleware> _logger;
    public SignalrInterceptionMiddleware(RequestDelegate next, IUserRedisService redisService, IOptions<JwtOption> options, ILogger<SignalrInterceptionMiddleware> logger)
    {
        _next = next;
        _redisService = redisService;
        _jwtOption = options.Value;
        _logger = logger;
    }

    public async Task InvokeAsync(HttpContext context)
    {
        //if(context.WebSockets.IsWebSocketRequest && context.Request.Path.StartsWithSegments("/messageHub"))
        if (context.Request.Path.StartsWithSegments("/messageHub"))
        {
            var accessToken = context.Request.Query["access_token"].ToString();
            if (!string.IsNullOrEmpty(accessToken))
            {
                var token = accessToken.Replace(AuthorizationSchem, string.Empty).Trim();
                var userId = GetUserId(token);
                var redisUser = await _redisService.GetUserSessionAsync(userId);
                if (redisUser == null) throw new CustomException(StatusCode.Unauthorized);
                if (redisUser.Token == null || !redisUser.ExpireDateTime.HasValue) throw new CustomException(StatusCode.Unauthorized);
                if (redisUser.Token != token) throw new CustomException(StatusCode.LoginOnOtherDevice);
                var identity = new ClaimsIdentity(new List<Claim>
                {
                    new Claim(GlobalConstants.UserId,userId.ToString()),
                    new Claim(GlobalConstants.UserName, redisUser.Name.ToString()),

                }, "Custom");
                context.User = new ClaimsPrincipal(identity);
                context.Items.Add(GlobalConstants.UserSession, redisUser);
            }

        }
        await _next(context);
    }
    private long GetUserId(string? token)
    {
        try
        {
            if (token.NotNull())
            {
                var claims = JWTHelper.ValiateToken(token, _jwtOption);
                if (claims != null)
                {
                    var userId = claims.FirstOrDefault(t => t.Type.Equals(GlobalConstants.UserId))?.Value;
                    if (userId.NotNull()) return Convert.ToInt64(userId);
                }
            }

        }
        catch
        {
            _logger.LogError(string.Format("解析token异常,SignalrInterceptionMiddleware=>GetUserId"));
        }
        return default(long);
    }
}

将当前登录人的信息通过token解析到Context.item中方便ICurrentLoginInfoService使用

在 Program.cs中在引入SignalR之前引入该中间件

app.UseMiddleware<SignalrInterceptionMiddleware>();
app.MapHub<ChatHub>("/messageHub");

 

 

前端

以vue为例

安装包@microsoft/signalr

npm install @microsoft/signalr

建立一个SignalRHelper的公共类

import * as signalr from '@microsoft/signalr'
export class SingalRHelper {
  connection: any
  retryCount = 0
  maxRetryAttempts = 5
  retryInterval = 5000
  url = `${import.meta.env.VITE_BASEURL}/messageHub`
  constructor() {}
  /**
   * 初始化SignalR
   * @param token
   */
  initSignalR(token: any) {
    this.connection = new signalr.HubConnectionBuilder()
      .withUrl(this.url, {
        skipNegotiation: true,
        transport: signalr.HttpTransportType.WebSockets,
        headers: { Authorization: token },
        accessTokenFactory: () => token
      })
      .build()
    this.startConnection()
  }

  /**
   * 连接
   */
  startConnection() {
    this.connection
      .start()
      .then(() => {
        this.retryCount = 0
        console.log('Connected to SignalR succeddfully!')
      })
      .catch((err: any) => {
        console.log('Error connecting to SignalR:', err)
        if (this.retryCount < this.maxRetryAttempts) {
          setTimeout(() => {
            this.retryCount++
            this.startConnection()
          }, this.retryInterval)
        }
      })
  }

  /**
   * 监听服务端消息
   * @param callback
   */
  onMessageReceived(callback: any) {
    //在连接对象上注册消息接收事件的回调函数
    this.connection.on('MessageReceived', callback)
  }
  /**
   * 断开连接
   */
  stopSignalR() {
    if (this.connection) {
      this.connection.stop()
      console.log('Disconnected from SignalR.')
    }
  }
}

注;SignalR在初始化时Url的目标地址要与后端在Program.cs中引入SignalR的地址保持一致“/messageHub”

页面调用

onMounted(() => {  
   signalR.initSignalR()
   signalR.onMessageReceived((message: any) => {
    console.log('signalR:', message)
  })
})

 

标签:userId,实时,SignalR,token,var,new,推送,public
From: https://www.cnblogs.com/sugarwxx/p/18405855

相关文章

  • el-table树形懒加载表格展开后 子节点修改数据后实时刷新
    问题描述在项目中遇到一个关于el-table的懒加载树型结构修改数据后需要刷新数据的问题,需要手动刷新页面之后才能刷新问题解决:1.首先创建map来用于存取数据,constloadMap=newMap();//存储load加载的子节点以能够编辑后更新2.在table展开子节点时,用map存下每次被加载......
  • 实现一个基于 Spring Boot 和 Vue.js 的实时消息推送系统
    在现代互联网应用中,实时消息推送已经成为一个非常重要的功能。不论是即时通讯、通知系统,还是其他需要实时互动的应用场景,消息的实时性直接影响到用户的体验和应用的效率。在这篇文章中,我将详细介绍如何使用SpringBoot和Vue.js创建一个实时消息推送系统,并确保每个用户只......
  • RTOS实时操作系统(任务运行性能分析)
    1,查看任务运行状态:X:表示任务当前正在运行(eXecuting)。B:表示任务处于阻塞状态(Blocked),这意味着任务正在等待某个事件发生,比如等待信号量、互斥量、事件组、消息队列或者定时器。R:表示任务处于就绪状态(Ready),这意味着任务已经准备好运行,但是当前没有在运行,因为它被调度器分配给其......
  • 消息推送第三方平台(个推)接入工具类
    个推官方文档:https://docs.getui.com/getui/server/rest_v2/push/首先申请个推官方账号,然后注册App获取AppID、AppKey、AppSecret、MasterSecret接入教程1、编写配置文件    修改.yml文件getui:AppID:OokKLlwRjU7tJMccVVra72AppKey:f8C6lK7OGu1115ckOfVxD8M......
  • Prometheus的拉取模式与zabbix推送模式有何区别?各有什么优缺点?
    Prometheus的拉取模式与Zabbix的推送模式在监控数据收集和处理方式上存在显著区别。以下是它们的主要区别及各自的优缺点:1.数据收集模式Prometheus拉取模式:Prometheus定期从被监控的目标(如Exporter、应用程序等)主动拉取数据。每个目标都需要暴露一个HTTP接口,Prome......
  • 淘宝返利微信机器人的消息处理与推送技术
    淘宝返利微信机器人的消息处理与推送技术大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们来讨论如何实现淘宝返利微信机器人的消息处理与推送技术。微信机器人可以有效地提升用户体验,通过自动化的消息处理和推送,帮助用户获取最新的返利......
  • Go实现实时文件监控功能
    一、使用库介绍fsnotify是Go语言中的一个库,用于监听文件系统的变更事件。它允许程序注册对文件系统事件的兴趣,并在这些事件发生时接收通知。fsnotify主要用来监控目录下的文件变化,如创建、删除或修改等。使用fsnotify安装fsnotify库。可以通过以下命令来安装:goget-u......
  • 大数据新视界--大数据大厂之Java 与大数据携手:打造高效实时日志分析系统的奥秘
           ......
  • 基于yolov10的行人跌倒检测系统,支持图像检测,也支持视频和摄像实时检测(pytorch框架)【py
       更多目标检测和图像分类识别项目可看我主页其他文章功能演示:基于yolov10的行人跌倒检测系统,支持图像、视频和摄像实时检测【pytorch框架、python】_哔哩哔哩_bilibili(一)简介基于yolov10的行人跌倒检测系统是在pytorch框架下实现的,这是一个完整的项目,包括代码,数据集,训......
  • 【2024年Python量化分析】为股票数据量化分析最新整理的免费获取股票实时行情数据API
    ​最近一两年,股票量化分析越来越火了,想入门这行,首先得搞定股票数据。毕竟,所有量化分析都是靠数据说话的,实时交易、历史交易、财务、基本面,这些数据咱们都得有。咱们的目标就是把这些数据里的金子挖出来,指导咱们的投资策略。​为了找数据,我可是没少折腾,自己动手写过网易、......