首页 > 其他分享 >Go使用websocket+nats队列实现聊天

Go使用websocket+nats队列实现聊天

时间:2024-06-08 16:11:37浏览次数:17  
标签:websocket room err nats Session func Go melody roomInfo

示例用的github.com/olahol/melody其实是在gorilla/websocket的基础上封装了一下

package main

import (
	"encoding/json"
	"fmt"
	"github.com/nats-io/nats.go"
	"github.com/olahol/melody"
	"log"
	"net/http"
	"runtime/debug"
	"sync"
)

// GlobalOnlineUser 管理room
type GlobalOnlineUser struct {
	Rooms   map[string]map[*melody.Session]struct{}
	RwMutex *sync.RWMutex
}

// 初始化room
func newGlobalOnlineUser() *GlobalOnlineUser {
	return &GlobalOnlineUser{
		RwMutex: &sync.RWMutex{},
		Rooms:   make(map[string]map[*melody.Session]struct{}),
	}
}

// AddUserToRoom 把用户加入到房间
func (g *GlobalOnlineUser) AddUserToRoom(s *melody.Session, room string) map[*melody.Session]struct{} {
	g.RwMutex.RLock()
	defer g.RwMutex.RUnlock()
	var roomInfo map[*melody.Session]struct{}
	//判断如果是新房间就初始化,老房间就加入
	if _, ok := g.Rooms[room]; !ok {
		roomInfo = make(map[*melody.Session]struct{})
		g.Rooms[room] = roomInfo
	} else {
		roomInfo = g.Rooms[room]
	}
	//当前用户放到房间里
	roomInfo[s] = struct{}{}
	return roomInfo
}

// DelUserByRoom 把用户从房间删除
func (g *GlobalOnlineUser) DelUserByRoom(s *melody.Session, room string) map[*melody.Session]struct{} {
	g.RwMutex.RLock()
	defer g.RwMutex.RUnlock()
	var roomInfo map[*melody.Session]struct{}
	//判断房间存在就删除当前用户
	if _, ok := g.Rooms[room]; ok {
		roomInfo = g.Rooms[room]
		delete(roomInfo, s)
	}
	return roomInfo
}

type IRouter interface {
	// 处理器
	Handle(*Message)
}
type BaseRouter struct {
	Apis map[int]IRouter
	sync.RWMutex
}

func (c *BaseRouter) Handle(*Message) {}
func (c *BaseRouter) AddRouter(msgId int, router IRouter) {
	c.Lock()
	defer c.Unlock()
	c.Apis[msgId] = router
}

// HelloInfo hello消息解析器
type HelloInfo struct{}

func (p *HelloInfo) Handle(request *Message) {
	log.Printf("handle done%v,%d", request.Msg, request.MsgId)
}

var (
	natsURL1     = "nats://192.168.253.1:4222" //nats的连接地址
	natsSubject1 = "websocket_messages"        //订阅的主题
	natsConn     *nats.Conn                    //nats连接
)

// NewMsgHandle 接收订阅消息,并且根据路由id处理
func NewMsgHandle() *BaseRouter {
	//初始化路由map
	msgHandle := &BaseRouter{
		Apis: make(map[int]IRouter),
	}
	// 创建一个 NATS 订阅者,监听消息
	go func() {
		var message Message
		_, err := natsConn.Subscribe(natsSubject1, func(msg *nats.Msg) {
			err := json.Unmarshal(msg.Data, &message)
			if err != nil {
				fmt.Println("解码失败:", err)
				return
			}
			defer func() {
				if err := recover(); err != nil {
					// 防止panic
					log.Println(string(debug.Stack()))
				}
			}()
			handler, ok := msgHandle.Apis[message.MsgId]
			if !ok {
				log.Println("没有找到该方法的协议号")
				return
			}
			handler.Handle(&message)
		})
		if err != nil {
			log.Fatal(err)
		}
	}()
	return msgHandle
}

// Message 消息结构体
type Message struct {
	MsgId int         `json:"msgId"`
	Msg   interface{} `json:"msg"`
}

func main() {
	m := melody.New()
	//m.Config.PingPeriod = time.Second * 3 //心跳包cd
	//m.Config.PongWait = time.Second * 3   //回复客户端心跳包
	var err error
	natsConn, err = nats.Connect(natsURL1)
	if err != nil {
		log.Fatal(err)
	}
	defer natsConn.Close()
	//增加路由
	msgHandle := NewMsgHandle()
	msgHandle.AddRouter(1001, &HelloInfo{})
	//初始化全局room
	rooms := newGlobalOnlineUser()                                      // 用于存储房间和对应的Melody实例
	m.Upgrader.CheckOrigin = func(r *http.Request) bool { return true } //跨域
	// 注册WebSocket连接事件的处理函数,
	m.HandleConnect(func(s *melody.Session) {
		fmt.Println("链接成功", s)
		// 在连接建立时发送欢迎消息
		s.Write([]byte("欢迎登录!"))
	})

	// 注册WebSocket消息事件的处理函数
	m.HandleMessage(func(s *melody.Session, msg []byte) {
		fmt.Println("服务器接收到消息:", string(msg))
		// 解析收到的消息,格式为 "Join:roomName" 或 "Leave:roomName"
		//strs := strings.Split(string(msg), ":")
		//cmd := strs[0]
		//room := strs[1]
		//cmd := "Join"
		//room := "test"
		//switch cmd {
		//case "Join":
		//	joinRoom(s, room, rooms)
		//case "Leave":
		//	leaveRoom(s, room, rooms)
		//default:
		//	m.BroadcastFilter(msg, func(q *melody.Session) bool { //房间群发,遍历全服用户,同一房间的下发消息。
		//		//return q.Get("room") == s.Get("room")
		//		return true
		//	})
		//	m.Broadcast(msg) //群发
		//}
		//发消息
		err = natsConn.Publish(natsSubject1, msg)
		if err != nil {
			log.Fatal(err)
		}
	})
	// 回复客户端心跳包 pong
	m.HandlePong(func(s *melody.Session) {
		if s.IsClosed() {
			fmt.Println("心跳包发出去,但是客户端退出了")
		}
		s.Write([]byte(`{"type":"pong"}`))
	})
	//处理错误,防止panic? TODO
	m.HandleError(func(s *melody.Session, err error) {
		if err != nil {
			fmt.Println("处理错误", err)
		}
	})
	// 注册WebSocket关闭事件的处理函数
	m.HandleDisconnect(func(s *melody.Session) {
		fmt.Println("客户端退出")
		// 断开连接时自动离开当前房间
		room, exies := s.Get("room")
		if exies {
			fmt.Println("存在房间", room)
			leaveRoom(s, room.(string), rooms)
		}
	})

	// 创建HTTP处理器
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		// 将HTTP请求升级为WebSocket连接
		err := m.HandleRequest(w, r)
		if err != nil {
			fmt.Println("WebSocket upgrade error:", err)
		}
	})

	// 启动HTTP服务器
	err = http.ListenAndServe(":8080", nil)
	if err != nil {
		fmt.Println("HTTP server error:", err)
	}
}

// 加入房间
func joinRoom(s *melody.Session, room string, rooms *GlobalOnlineUser) {
	//当前用户放到房间里
	roomInfo := rooms.AddUserToRoom(s, room)
	//群发
	for conn, _ := range roomInfo {
		conn.Write([]byte("加入到了房间"))
	}
	s.Set("room", room)
}

// 离开房间
func leaveRoom(s *melody.Session, room string, rooms *GlobalOnlineUser) {
	roomInfo := rooms.DelUserByRoom(s, room)
	//群发
	for conn, _ := range roomInfo {
		conn.Write([]byte("离开了房间"))
	}
	s.UnSet("room")
}

标签:websocket,room,err,nats,Session,func,Go,melody,roomInfo
From: https://www.cnblogs.com/qcy-blog/p/18238697

相关文章

  • Go fmt.Print() 格式化
    %v打印结构体%+V打印带有字段的结构体%T打印对象类型%t打印布尔值%d打印整型数,十进制输出,如果d前面有数字,表示控制输出宽度,默认使用空白填充,%05d会在不满5位时填充0%b打印整型数,二进制输出%c打印整型数,字符输出(如果有)%o打印整型数,八进制输出,如果x前面带有#表示带......
  • go 闭包捕获问题
    在Go语言中,闭包(closure)是一个函数值,它引用了其外部作用域中的变量。简而言之,闭包能够“捕获”并“记住”其外部作用域中的变量,即使这个变量的生命周期已经结束。闭包的这种特性使得它在许多编程场景中非常有用,但也可能导致一些意外行为,尤其是在捕获变量时。捕获问题的例子一个常......
  • 纯CSS+单个div实现抖音LOGO
    纯CSS+单个div就能绘制抖音LOGO关键点:主要借助了两个伪元素实现了整体结构,借助了drop-shadow生成一层整体阴影drop-shadow只能是单层阴影,所以另一层阴影需要多尝试contrast(150%)brightness(110%)则可以增强图像的对比度和亮度,更贴近抖音LOGO的效果预览结果如下:在线......
  • Go语言入门随笔
    基本数据类型intint8有符号无符号字符串bool数组切片(基于数组)引用类型map结构体(嵌套,继承)接口(空接口很强大)指针(将值类型变成了引用类型)函数可以当做参数deferpanicrecoverchannel线程安全sync锁读写锁waitgroup等等协程执行完成。ADD(1)Done()wa......
  • go 操作mac
    cilium1.15.1生成随机macpackagemainimport( "crypto/rand" "fmt" "net")//MACaddressisannet.HardwareAddrencapsulationtoforceciliumtoonlyuseMAC-48.typeMACnet.HardwareAddr//Stringreturnsthestringrepr......
  • Go结构体对齐
    具体可以参考b站的幼麟实验室,很硬核typePstruct{ abool bint32 cint8 dint64 ebyte}varpPfmt.Printf("%v\n",unsafe.Sizeof(p)//32成员变量对齐方式为:min(8,1)=1,由于是第一个成员,偏移量为0即可,此时内存占位为:amin(8,4)=4,偏移量为4即可,此时内存占......
  • 【启程Golang之旅】让文件操作变得简单
    欢迎来到Golang的世界!在当今快节奏的软件开发领域,选择一种高效、简洁的编程语言至关重要。而在这方面,Golang(又称Go)无疑是一个备受瞩目的选择。在本文中,带领您探索Golang的世界,一步步地了解这门语言的基础知识和实用技巧。目录初识文件IO的引入读取文件写入文件文件复制......
  • Dragon_Knight_CTF-stack(栈迁移)
    Dragon_Knight_CTF-stack(栈迁移)程序的保护情况如下,可以看到没有开启pie保护Arch:amd64-64-littleRELRO:PartialRELROStack:NocanaryfoundNX:NXenabledPIE:NoPIE(0x3fe000)可以看道main函数也很简洁,只有一个0x10大小的溢出,程序给了libc,版......
  • Django上传图片时ImageField的max_length报错
    我使用的版本是Django4.2,有一个模型里定义了ImageField,以下面这个为例:classExample(models.Model)image=models.ImageField(blank=True,upload_to=my_image_path,)当我上传图片的时候,django返回了这样一个错误:Ensurethisfilenam......
  • 如何使用Go语言连接和操作数据库?
    文章目录1.安装MySQL驱动2.连接数据库3.执行查询4.执行插入、更新和删除操作在Go语言中,连接和操作数据库通常使用database/sql包,它提供了一个数据库抽象层,支持多种数据库引擎,如MySQL、PostgreSQL、SQLite等。下面我将以MySQL为例,详细讲解如何使用Go语言连接和......