示例用的github.com/olahol/melody其实是在gorilla/websocket的基础上封装了一下
package main
import (
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
"github.com/olahol/melody"
"log"
"net/http"
"runtime/debug"
"sync"
)
// GlobalOnlineUser 管理room
type GlobalOnlineUser struct {
Rooms map[string]map[*melody.Session]struct{}
RwMutex *sync.RWMutex
}
// 初始化room
func newGlobalOnlineUser() *GlobalOnlineUser {
return &GlobalOnlineUser{
RwMutex: &sync.RWMutex{},
Rooms: make(map[string]map[*melody.Session]struct{}),
}
}
// AddUserToRoom 把用户加入到房间
func (g *GlobalOnlineUser) AddUserToRoom(s *melody.Session, room string) map[*melody.Session]struct{} {
g.RwMutex.RLock()
defer g.RwMutex.RUnlock()
var roomInfo map[*melody.Session]struct{}
//判断如果是新房间就初始化,老房间就加入
if _, ok := g.Rooms[room]; !ok {
roomInfo = make(map[*melody.Session]struct{})
g.Rooms[room] = roomInfo
} else {
roomInfo = g.Rooms[room]
}
//当前用户放到房间里
roomInfo[s] = struct{}{}
return roomInfo
}
// DelUserByRoom 把用户从房间删除
func (g *GlobalOnlineUser) DelUserByRoom(s *melody.Session, room string) map[*melody.Session]struct{} {
g.RwMutex.RLock()
defer g.RwMutex.RUnlock()
var roomInfo map[*melody.Session]struct{}
//判断房间存在就删除当前用户
if _, ok := g.Rooms[room]; ok {
roomInfo = g.Rooms[room]
delete(roomInfo, s)
}
return roomInfo
}
type IRouter interface {
// 处理器
Handle(*Message)
}
type BaseRouter struct {
Apis map[int]IRouter
sync.RWMutex
}
func (c *BaseRouter) Handle(*Message) {}
func (c *BaseRouter) AddRouter(msgId int, router IRouter) {
c.Lock()
defer c.Unlock()
c.Apis[msgId] = router
}
// HelloInfo hello消息解析器
type HelloInfo struct{}
func (p *HelloInfo) Handle(request *Message) {
log.Printf("handle done%v,%d", request.Msg, request.MsgId)
}
var (
natsURL1 = "nats://192.168.253.1:4222" //nats的连接地址
natsSubject1 = "websocket_messages" //订阅的主题
natsConn *nats.Conn //nats连接
)
// NewMsgHandle 接收订阅消息,并且根据路由id处理
func NewMsgHandle() *BaseRouter {
//初始化路由map
msgHandle := &BaseRouter{
Apis: make(map[int]IRouter),
}
// 创建一个 NATS 订阅者,监听消息
go func() {
var message Message
_, err := natsConn.Subscribe(natsSubject1, func(msg *nats.Msg) {
err := json.Unmarshal(msg.Data, &message)
if err != nil {
fmt.Println("解码失败:", err)
return
}
defer func() {
if err := recover(); err != nil {
// 防止panic
log.Println(string(debug.Stack()))
}
}()
handler, ok := msgHandle.Apis[message.MsgId]
if !ok {
log.Println("没有找到该方法的协议号")
return
}
handler.Handle(&message)
})
if err != nil {
log.Fatal(err)
}
}()
return msgHandle
}
// Message 消息结构体
type Message struct {
MsgId int `json:"msgId"`
Msg interface{} `json:"msg"`
}
func main() {
m := melody.New()
//m.Config.PingPeriod = time.Second * 3 //心跳包cd
//m.Config.PongWait = time.Second * 3 //回复客户端心跳包
var err error
natsConn, err = nats.Connect(natsURL1)
if err != nil {
log.Fatal(err)
}
defer natsConn.Close()
//增加路由
msgHandle := NewMsgHandle()
msgHandle.AddRouter(1001, &HelloInfo{})
//初始化全局room
rooms := newGlobalOnlineUser() // 用于存储房间和对应的Melody实例
m.Upgrader.CheckOrigin = func(r *http.Request) bool { return true } //跨域
// 注册WebSocket连接事件的处理函数,
m.HandleConnect(func(s *melody.Session) {
fmt.Println("链接成功", s)
// 在连接建立时发送欢迎消息
s.Write([]byte("欢迎登录!"))
})
// 注册WebSocket消息事件的处理函数
m.HandleMessage(func(s *melody.Session, msg []byte) {
fmt.Println("服务器接收到消息:", string(msg))
// 解析收到的消息,格式为 "Join:roomName" 或 "Leave:roomName"
//strs := strings.Split(string(msg), ":")
//cmd := strs[0]
//room := strs[1]
//cmd := "Join"
//room := "test"
//switch cmd {
//case "Join":
// joinRoom(s, room, rooms)
//case "Leave":
// leaveRoom(s, room, rooms)
//default:
// m.BroadcastFilter(msg, func(q *melody.Session) bool { //房间群发,遍历全服用户,同一房间的下发消息。
// //return q.Get("room") == s.Get("room")
// return true
// })
// m.Broadcast(msg) //群发
//}
//发消息
err = natsConn.Publish(natsSubject1, msg)
if err != nil {
log.Fatal(err)
}
})
// 回复客户端心跳包 pong
m.HandlePong(func(s *melody.Session) {
if s.IsClosed() {
fmt.Println("心跳包发出去,但是客户端退出了")
}
s.Write([]byte(`{"type":"pong"}`))
})
//处理错误,防止panic? TODO
m.HandleError(func(s *melody.Session, err error) {
if err != nil {
fmt.Println("处理错误", err)
}
})
// 注册WebSocket关闭事件的处理函数
m.HandleDisconnect(func(s *melody.Session) {
fmt.Println("客户端退出")
// 断开连接时自动离开当前房间
room, exies := s.Get("room")
if exies {
fmt.Println("存在房间", room)
leaveRoom(s, room.(string), rooms)
}
})
// 创建HTTP处理器
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 将HTTP请求升级为WebSocket连接
err := m.HandleRequest(w, r)
if err != nil {
fmt.Println("WebSocket upgrade error:", err)
}
})
// 启动HTTP服务器
err = http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Println("HTTP server error:", err)
}
}
// 加入房间
func joinRoom(s *melody.Session, room string, rooms *GlobalOnlineUser) {
//当前用户放到房间里
roomInfo := rooms.AddUserToRoom(s, room)
//群发
for conn, _ := range roomInfo {
conn.Write([]byte("加入到了房间"))
}
s.Set("room", room)
}
// 离开房间
func leaveRoom(s *melody.Session, room string, rooms *GlobalOnlineUser) {
roomInfo := rooms.DelUserByRoom(s, room)
//群发
for conn, _ := range roomInfo {
conn.Write([]byte("离开了房间"))
}
s.UnSet("room")
}
标签:websocket,room,err,nats,Session,func,Go,melody,roomInfo
From: https://www.cnblogs.com/qcy-blog/p/18238697