WebSocket 集成指南
本文档描述了如何在基于 go-zero 框架的项目中集成 WebSocket。
1. 安装依赖
首先,安装 gorilla/websocket 库:
go get github.com/gorilla/websocket
2. 项目结构
在项目中添加以下文件和目录:
└── pkg
└── websocket
└── websocket.go
3. WebSocket 实现
在 pkg/websocket/websocket.go
中实现 WebSocket 处理器:
package websocket
import (
"net/http"
"sync"
"github.com/gorilla/websocket"
"github.com/zeromicro/go-zero/core/logx"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源,您可能需要根据实际情况修改这个检查
},
}
type Connection struct {
conn *websocket.Conn
mu sync.Mutex
}
type Hub struct {
connections map[*Connection]bool
broadcast chan []byte
register chan *Connection
unregister chan *Connection
}
func NewHub() *Hub {
return &Hub{
connections: make(map[*Connection]bool),
broadcast: make(chan []byte),
register: make(chan *Connection),
unregister: make(chan *Connection),
}
}
func (h *Hub) Run() {
for {
select {
case conn := <-h.register:
h.connections[conn] = true
case conn := <-h.unregister:
if _, ok := h.connections[conn]; ok {
delete(h.connections, conn)
conn.conn.Close()
}
case message := <-h.broadcast:
for conn := range h.connections {
conn.Write(message)
}
}
}
}
func (c *Connection) Write(message []byte) {
c.mu.Lock()
defer c.mu.Unlock()
err := c.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
logx.Errorf("Error writing message: %v", err)
}
}
func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logx.Errorf("Error upgrading to WebSocket: %v", err)
return
}
c := &Connection{conn: conn}
h.register <- c
defer func() {
h.unregister <- c
}()
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logx.Errorf("Error reading message: %v", err)
}
break
}
h.broadcast <- message
}
}
4. 配置
在 internal/config/config.go
中添加 WebSocket 配置:
type Config struct {
rest.RestConf
WebSocket struct {
Path string
}
// 其他配置...
}
在 etc/standard-api.yaml
中添加 WebSocket 配置:
WebSocket:
Path: "/ws"
# 其他配置...
5. 服务上下文集成
在 internal/svc/service_context.go
中初始化 WebSocket Hub:
type ServiceContext struct {
Config config.Config
WSHub *websocket.Hub
// 其他服务...
}
func NewServiceContext(c config.Config) *ServiceContext {
wsHub := websocket.NewHub()
go wsHub.Run()
return &ServiceContext{
Config: c,
WSHub: wsHub,
// 初始化其他服务...
}
}
6. 路由注册
在 internal/handler/routes.go
中添加 WebSocket 路由:
func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
// 其他路由...
server.AddRoute(
rest.Route{
Method: http.MethodGet,
Path: serverCtx.Config.WebSocket.Path,
Handler: serverCtx.WSHub.HandleWebSocket,
},
)
}
7. 在 Logic 层中使用
在 internal/logic/standard.go
中使用 WebSocket:
func (l *StandardLogic) BroadcastMessage(message string) {
l.svcCtx.WSHub.broadcast <- []byte(message)
}
8. 与 Kafka 集成(可选)
如果您想将 Kafka 消息转发到 WebSocket 客户端,可以在 Kafka 消费者处理函数中添加:
func (l *StandardLogic) StartConsumer() error {
consumer, err := kafka.NewConsumer(l.svcCtx.Config.Kafka.Brokers, consts.ConsumerGroupExample1)
if err != nil {
return err
}
defer consumer.Close()
return consumer.Consume(l.ctx, []string{consts.TopicExample1}, func(message *sarama.ConsumerMessage) error {
// 处理消息
l.Infof("Received message: %s", string(message.Value))
// 将 Kafka 消息广播到 WebSocket 客户端
l.BroadcastMessage(string(message.Value))
return nil
})
}
这样,您就在项目中成功封装了 WebSocket 功能。客户端可以通过配置的 WebSocket 路径(例如 ws://your-server/ws
)连接到 WebSocket 服务。您可以在 logic 层中使用 BroadcastMessage
方法向所有连接的客户端发送消息,也可以将 Kafka 消息转发到 WebSocket 客户端。
注意:这个实现是基本的,您可能需要根据具体需求进行进一步的优化和扩展,比如添加身份验证、错误处理、心跳检测等功能。
标签:websocket,Connection,zero,func,go,WebSocket,Config From: https://www.cnblogs.com/lwhzj/p/18350799