本文目录
一、背景介绍
本文是近期为了参加字节青训营,刚好学完Go的网络编程基础,如TCP\UDP、Socket、WebSocket等,然后根据B站up主码神之路的教学进行WebSocket聊天室项目开发,借此机会刚好复习下前面学过的各类知识点,比如指针、切片、接口、goroutine协程、channel通道、Socket网络编程的、WebSocket等。
先看看演示效果,可以做到多个用户同时进行聊天,类似微信群聊。
总体而言,后端代码大概300行左右,前端代码大概100行,是一个比较简单的websocket类网络demo。
二、后端代码
后端主要由4个文件,分别是connection.go、data.go、hub.go、server.go。
1、data.go
主要定义一个用于存储数据的结构体Data,包含多个字段。
分别是IP地址、用户名、数据来源、请求类型、请求内容、用户列表。
该结构体能够方便地与json数据进行映射和转换,是用来处理网络请求中的数据传输、解析等场景。
接收到了前端发送的包含用户信息、请求内容等JSON数据之后,将其解析为Data结构体实例,方便后续处理;或者将Data结构体实例转换为Json数据发送给前端。
打个比方:json:“ip”,表示在将该结构体转换为 JSON 数据时,该字段对应的 JSON 键名为 “ip”;从 JSON 数据解析为结构体时,也会根据 “ip” 键名来赋值给该字段。
package main
type Data struct {
Ip string `json:"ip"`
User string `json:"user"`
From string `json:"from"`
Type string `json:"type"`
Content string `json:"content"`
UserList []string `json:"user_list"`
}
2、server.go
package main
import (
"fmt"
"net/http"
"github.com/gorilla/mux"
)
func main() {
router := mux.NewRouter()
go h.run()
//当访问127.0.0.1:8080/ws的时候会跳转到myws处理请求
router.HandleFunc("/ws", myws)
//进行一个监听,websocket是基于http协议上的。
if err := http.ListenAndServe("127.0.0.1:8080", router); err != nil {
fmt.Println("err:", err)
}
}
使用到的两个包说明:
net/http:提供 HTTP 协议的客户端和服务器实现,用于启动 HTTP 服务器。
github.com/gorilla/mux:是一个强大的 URL 路由器和调度器,用于处理不同的 HTTP 请求路径。
router := mux.NewRouter()
作用:创建一个新的 mux.Router 实例,用于处理 HTTP 请求的路由。通过它可以为不同的 URL 路径指定不同的处理函数。
go h.run()
作用:启动一个协程(goroutine)来运行 h.run() 方法。这里的 h 是在其他地方(hub.go 文件中)定义的一个 Hub 类型的实例的变量名。h.run() 方法是用于启动 WebSocket 通信中心的主循环,处理客户端的连接、消息转发等核心逻辑。使用 go 关键字启动协程,是为了让 h.run() 方法在后台并发执行,不会阻塞主程序的其他操作,如启动 HTTP 服务器等。
router.HandleFunc("/ws", myws)
作用:为 /ws 路径注册一个处理函数 myws。当有 HTTP 请求访问 /ws 路径时,会调用 myws 函数来处理。这个路径很可能用于 WebSocket 连接的升级,即客户端通过这个路径发起请求,服务器将其升级为 WebSocket 协议的连接,从而实现 WebSocket 通信。
3、hub.go
hub.go 文件定义了一个 hub 结构体,用于管理 WebSocket 连接和消息传递。hub 的 run 方法通过监听多个通道,处理新连接、断开连接和广播消息等操作,是整个 WebSocket 群聊聊天室的核心逻辑部分。
package main
import "encoding/json"
var h = hub{
c: make(map[*connection]bool),
u: make(chan *connection),
b: make(chan []byte),
r: make(chan *connection),
}
type hub struct {
c map[*connection]bool
b chan []byte
r chan *connection
u chan *connection
}
func (h *hub) run() {
for {
select {
case c := <-h.r:
h.c[c] = true
c.data.Ip = c.ws.RemoteAddr().String()
c.data.Type = "handshake"
c.data.UserList = user_list
data_b, _ := json.Marshal(c.data)
c.sc <- data_b
case c := <-h.u:
if _, ok := h.c[c]; ok {
delete(h.c, c)
close(c.sc)
}
case data := <-h.b:
for c := range h.c {
select {
case c.sc <- data:
default:
delete(h.c, c)
close(c.sc)
}
}
}
}
}
import "encoding/json"
作用:导入 encoding/json 包,用于处理 JSON 数据的编码和解码。
type hub struct {
c map[*connection]bool //存储当前活跃的 WebSocket 连接。
b chan []byte //接收要广播的消息。
r chan *connection //通道,接收新建立的连接。
u chan *connection //通道,接收要断开的连接。
}
作用:定义 hub 结构体,用于管理 WebSocket 连接和消息传递。
var h = hub{
c: make(map[*connection]bool),
u: make(chan *connection),
b: make(chan []byte),
r: make(chan *connection),
}
作用:定义并初始化一个全局变量 h,它是 hub 类型的实例。hub 是用于管理 WebSocket 连接和消息传递的核心结构体。
c:map映射,键是 *connection 类型,值是 bool 类型。用于存储当前活跃的 WebSocket 连接,键值对的存在表示该连接是活跃的。
通过make去创建空间初始化。
func (h *hub) run() {
for { //无限循环,持续监听通道中的消息
select { //同时监听多个通道,根据通道中接收到的消息类型执行不同的操作。
case c := <-h.r: //处理新建立的连接
h.c[c] = true // 将新连接 c 添加到活跃连接映射 h.c 中,标记为活跃
c.data.Ip = c.ws.RemoteAddr().String() // 设置连接的 IP 地址为客户端的远程地址
c.data.Type = "handshake" // 设置连接类型为 "handshake",表示这是一个握手连接
c.data.UserList = user_list // 设置连接的用户列表,user_list全局变量,包含当前所有活跃用户的列表
data_b, _ := json.Marshal(c.data) // 将连接数据 c.data 编码为 JSON 格式
c.sc <- data_b // 将 JSON 数据发送给客户端 c,完成握手过程。也就是把数据发回来,这样后面给后面的writer进行输出。
case c := <-h.u: // 监听 h.u 通道,处理要断开的连接
if _, ok := h.c[c]; ok { // 检查连接 c 是否存在于活跃连接映射 h.c 中
delete(h.c, c) // 如果存在,从映射中删除该连接
close(c.sc) // 关闭连接 c 的发送通道 sc,释放资源
}
case data := <-h.b: // 监听 h.b 通道,处理要广播的消息
for c := range h.c { // 遍历所有活跃连接
select { // 使用 select 语句尝试将消息 data 发送给每个连接 c
case c.sc <- data: // 成功发送消息,什么也不做
default: // 如果发送失败(通道已关闭或缓冲区满)
delete(h.c, c) // 从活跃连接映射 h.c 中删除该连接
close(c.sc) // 关闭连接 c 的发送通道 sc
}
}
}
}
}
作用:定义 hub 结构体的 run 方法,用于处理连接和消息的管理。
上面有比较多的c,所以需要弄清楚具体c的定义。case c是指从通道 h.r 和 h.u 中接收到的 *connection 类型的值。
4、connection.go
这个go文件实现了 WebSocket 连接的管理,包括连接的建立、消息的接收和发送、用户列表的管理以及连接的关闭。
通过 myws 函数处理 WebSocket 请求,writer 方法处理消息发送,reader 方法处理消息接收,del 函数用于管理用户列表。
package main
import (
"encoding/json"
"fmt"
"net/http"
"github.com/gorilla/websocket"
)
// connection 结构体表示一个 WebSocket 连接
type connection struct {
ws *websocket.Conn // WebSocket 连接
sc chan []byte // 用于发送消息的通道
data *Data // 存储数据,即Data.go中的数据
}
// wu 是一个 WebSocket Upgrader,用于将 HTTP 连接升级为 WebSocket 连接
var wu = &websocket.Upgrader{ReadBufferSize: 512, // 读取缓冲区大小
WriteBufferSize: 512, // 写入缓冲区大小
CheckOrigin: func(r *http.Request) bool { return true }} // 允许所有跨域请求
// myws 是处理 WebSocket 请求的函数
//得到两个对象,分别是responseWriter还有Request进行处理请求响应
func myws(w http.ResponseWriter, r *http.Request) {
ws, err := wu.Upgrade(w, r, nil) // 将 HTTP 连接升级为 WebSocket 连接
if err != nil {
return
}
c := &connection{sc: make(chan []byte, 256), ws: ws, data: &Data{}} // 创建一个connection 实例
h.r <- c // 将新连接发送到 hub 的 r 通道
go c.writer() // 启动一个协程处理消息发送
c.reader() // 处理消息接收
defer func() { // 退出前的清理工作
c.data.Type = "logout" // 设置连接类型为 "logout"
user_list = del(user_list, c.data.User) // 从用户列表中删除当前用户
c.data.UserList = user_list // 更新用户列表
c.data.Content = c.data.User // 设置内容为当前用户
data_b, _ := json.Marshal(c.data) // 将数据编码为 JSON
h.b <- data_b // 将消息发送到 hub 的 b 通道进行广播
h.r <- c // 将连接发送到 hub 的 r 通道进行处理
}()
}
// writer 方法用于处理消息发送,也就是把消息发送给前端客户端,前端代码会对应的进行解析。
func (c *connection) writer() {
// 将消息发送到 WebSocket 连接
for message := range c.sc {
c.ws.WriteMessage(websocket.TextMessage, message)
}
c.ws.Close() // 关闭 WebSocket 连接
}
var user_list = []string{} // user_list 存储当前所有活跃用户的列表
// reader 方法用于处理消息接收
func (c *connection) reader() {
for {
_, message, err := c.ws.ReadMessage() // 读取 WebSocket 消息
if err != nil {
h.r <- c // 如果读取消息失败,将连接发送到 hub 的 r 通道进行处理
break
}
json.Unmarshal(message, &c.data) // 解码 JSON 消息
switch c.data.Type { // 根据消息类型处理不同的逻辑
case "login": // 登录逻辑
c.data.User = c.data.Content //将 Content 字段的值赋给 User 字段,表示登录的用户。
c.data.From = c.data.User //设置消息的来源为登录的用户。
user_list = append(user_list, c.data.User) //将登录的用户添加到用户列表 user_list 中。
c.data.UserList = user_list //更新 Data 结构体中的用户列表。
data_b, _ := json.Marshal(c.data) //将 Data 结构体编码为 JSON 格式。
h.b <- data_b
case "user": // 用户信息更新逻辑
c.data.Type = "user" //设置消息类型为 "user"。
data_b, _ := json.Marshal(c.data) // Data 结构体编码为 JSON 格式。
h.b <- data_b //将消息发送到 hub 的 b 通道,进行广播。
case "logout": // 登出逻辑
c.data.Type = "logout"
user_list = del(user_list, c.data.User) //从用户列表 user_list 中删除登出的用户。
data_b, _ := json.Marshal(c.data)
h.b <- data_b //将消息发送到 hub 的 b 通道,进行广播。
h.r <- c
default:
fmt.Print("========default================")
}
}
}
// del 函数用于从用户列表中删除指定用户
func del(slice []string, user string) []string {
count := len(slice)
if count == 0 {
return slice
}
if count == 1 && slice[0] == user {
return []string{}
}
var n_slice = []string{}
for i := range slice {
if slice[i] == user && i == count {
return slice[:count]
} else if slice[i] == user {
n_slice = append(slice[:i], slice[i+1:]...)
break
}
}
fmt.Println(n_slice)
return n_slice
}
我们先来整体梳理一下connection.go的主要部分:
- 创建连接实例:在 myws 函数中,创建一个新的 connection 实例 c,并初始化其 sc 通道、ws 连接和 data 数据。
- 将新创建的 connection 实例 c 发送到 hub 的 r 通道,通知 hub 有一个新的连接需要处理。
- 启动消息发送协程:启动一个协程 go c.writer(),用于处理消息的发送。这个协程会从 c.sc 通道中读取消息,并将它们发送到WebSocket 连接 c.ws。
- 处理消息接收:在 myws 函数中,调用 c.reader() 方法处理消息的接收。这个方法会从 WebSocket 连接 c.ws 中读取消息,并根据消息类型进行不同的处理。
- 消息发送:writer 方法是一个无限循环,从 c.sc 通道中读取消息,并将它们发送到 WebSocket的客户端前端 。
- 消息接收:reader 方法是一个无限循环,从 WebSocket 连接 c.ws 中读取消息,并根据消息类型进行不同的处理。
- 清理工作:在 myws 函数的 defer 语句中,进行清理工作,包括发送登出消息、更新用户列表、关闭 WebSocket 连接等。
那么来看两个关键问题:
为什么c可以直接给h的通道发送数据?
h 是一个全局变量,表示 hub 实例。hub 实例包含多个通道,用于管理连接和消息。h.r、h.u 和 h.b 是 hub 实例的通道,分别用于接收新连接、处理断开连接和广播消息。在 myws 函数中,c 作为一个 connection 实例,可以直接将自己发送到 h.r 通道,通知 hub 有一个新的连接需要处理。同样,c 也可以将消息发送到 h.b 通道,通知 hub 有消息需要广播。
为什么 reader 不需要 go 协程开启?
writer 方法是一个无限循环,从 c.sc 通道中读取消息并发送到 WebSocket 连接 c.ws。这个操作是阻塞的,因为 c.ws.WriteMessage 方法会阻塞直到消息成功发送。如果 writer 方法不在独立的协程中运行,它会阻塞 myws 函数的执行,导致 myws 函数无法继续处理其他逻辑,如接收新的消息或处理连接的关闭。
func (c *connection) writer() {
for message := range c.sc {
c.ws.WriteMessage(websocket.TextMessage, message)
}
c.ws.Close()
}
for message := range c.sc:从 c.sc 通道中读取消息,这是一个阻塞操作。
c.ws.WriteMessage(websocket.TextMessage, message):将消息发送到 WebSocket 连接,这也是一个阻塞操作。
reader 方法也是一个无限循环,从 WebSocket 连接 c.ws 中读取消息并处理。这个操作同样是阻塞的,因为 c.ws.ReadMessage 方法会阻塞直到有新的消息到达。然而,reader 方法不需要在独立的协程中运行,原因如下:
1.HTTP 请求处理机制:
当一个 HTTP 请求到达时,Go 的 HTTP 服务器会为每个请求启动一个新的 goroutine 来处理。这意味着 myws 函数本身已经在一个独立的 goroutine 中运行。
myws 函数的主要职责是处理 WebSocket 连接的生命周期,包括接收消息、发送消息和处理连接的关闭。
2.资源管理:
如果 reader 方法在独立的协程中运行,需要额外的同步机制来确保在连接关闭时 reader 协程能够正确退出。会增加代码的复杂性。
通过将 reader 方法保留在 myws 函数的主 goroutine 中,可以简化资源管理。当 myws 函数退出时,reader 方法也会自然退出,确保所有资源都能被正确释放。
myws 函数的逻辑已经很清晰:启动 writer 协程处理消息发送,然后在主 goroutine 中处理消息接收。
5、后端流程
用户A发送消息:
用户A与服务器建立了WebSocket连接,服务器通过myws函数处理这个连接,创建一个connection实例并将其加入到hub中。
用户A在客户端发送消息,消息通过websocket.ReadMessage被读取并传递到connection.reader方法。
服务器根据消息的类型(如"user"),将消息内容解析为Data对象,并将其转发到hub.b通道。
服务器将消息广播给所有连接的客户端:
hub.run()方法会监听hub.b通道,当有消息写入hub.b时,它会遍历所有连接的用户(h.c中的连接)。
服务器会将消息通过c.sc <- data发送给每个连接的客户端,确保所有客户端(包括用户B)都能收到消息。
用户B接收到消息:
用户B的WebSocket连接会接收到消息,并通过c.writer方法将消息发送到B的客户端。
用户B会在其WebSocket客户端上接收到消息并进行处理。
6、一些难点部分
怎么理解hub?
hub 负责管理所有活跃的WebSocket连接。在你的代码中,hub 通过 map[*connection]bool 来存储每个活跃连接。每当一个新的连接建立时,服务器会将该连接添加到 hub.c 中;当连接关闭时,服务器会将连接从 hub.c 中移除。
接收新连接:当一个新的WebSocket连接建立时(即myws函数中被调用),服务器会将新的连接添加到hub.r通道。hub.run() 中的 select 会接收到这个新连接,并将其加入到 hub.c 中,同时初始化连接的一些属性(如用户IP、用户列表等)。
移除连接:当一个连接关闭时,hub.run() 会从 hub.u 通道接收到需要移除的连接,并从 hub.c 中删除该连接。
一个新用户上线的过程?
假设有一个新用户 C 发起了 WebSocket 连接。首先,用户 C 会向服务器发起连接请求(通过 /ws 路径)。这个过程会触发 myws 函数来处理连接。
func myws(w http.ResponseWriter, r *http.Request) {
ws, err := wu.Upgrade(w, r, nil)
if err != nil {
return
}
c := &connection{sc: make(chan []byte, 256), ws: ws, data: &Data{}}
h.r <- c // 将新连接发送到 hub 的 r 通道
go c.writer() // 启动 writer goroutine 来向客户端发送消息
c.reader() // 启动 reader goroutine 来接收客户端的消息
}
这里,c 是新的连接对象,它通过 h.r <- c 被传递到 hub 的 r 通道中。hub.run() 会从 r 通道接收到这个新连接,并将其添加到 hub.c 中。
case "login":
// 用户登录时
c.data.User = c.data.Content
c.data.From = c.data.User
user_list = append(user_list, c.data.User)
c.data.UserList = user_list
data_b, _ := json.Marshal(c.data)
h.b <- data_b
在 hub.run() 中,收到新连接后,执行以下操作:
case c := <-h.r: // 接收到一个新连接
h.c[c] = true // 将新连接添加到 hub 中的连接池
c.data.Ip = c.ws.RemoteAddr().String() // 获取用户 IP 地址
c.data.Type = "handshake" // 设置消息类型为 "handshake"
c.data.UserList = user_list // 更新当前聊天室的用户列表
data_b, _ := json.Marshal(c.data) // 将数据结构转换为 JSON 格式
c.sc <- data_b // 通过连接的 sc 通道将数据发送到客户端
在 hub.run() 中,接下来会广播新用户上线的信息给聊天室中的所有连接:
case data := <-h.b:
for c := range h.c {
select {
case c.sc <- data: // 将数据广播给所有连接,然后所有的连接的writer又写给对应的客户端
default:
delete(h.c, c) // 如果客户端不再活跃,则删除它
close(c.sc) // 关闭通道
}
}
三、前端代码
<!DOCTYPE html>
<html>
<head>
<title></title>
<meta http-equiv="content-type" content="text/html;charset=utf-8">
<style>
p {
text-align: left;
padding-left: 20px;
}
</style>
</head>
<body>
<div style="width: 800px;height: 600px;margin: 30px auto;text-align: center">
<h1>演示聊天室</h1>
<div style="width: 800px;border: 1px solid gray;height: 300px;">
<div style="width: 200px;height: 300px;float: left;text-align: left;">
<p><span>当前在线:</span><span id="user_num">0</span></p>
<div id="user_list" style="overflow: auto;">
</div>
</div>
<div id="msg_list" style="width: 598px;border: 1px solid gray; height: 300px;overflow: scroll;float: left;">
</div>
</div>
<br>
<textarea id="msg_box" rows="6" cols="50" οnkeydοwn="confirm(event)"></textarea><br>
<input type="button" value="发送" οnclick="send()">
</div>
</body>
</html>
<script type="text/javascript">
var uname = prompt('请输入用户名', 'user' + uuid(8, 16)); // 提示用户输入用户名,并生成默认的用户名(user + 随机字符串)
var ws = new WebSocket("ws://127.0.0.1:8080/ws"); // 创建 WebSocket 连接,连接到后台 WebSocket 服务器
ws.onopen = function () {
var data = "系统消息:建立连接成功"; // 连接建立成功后,显示系统消息
listMsg(data); // 调用 listMsg 函数将消息显示在聊天界面
};
ws.onmessage = function (e) {
var msg = JSON.parse(e.data); // 解析从服务器接收到的消息
var sender, user_name, name_list, change_type;
switch (msg.type) {
case 'system':
sender = '系统消息: ';
break;
case 'user':
sender = msg.from + ': ';
break;
case 'handshake':
var user_info = {'type': 'login', 'content': uname}; // 用户首次连接时,发送登录信息
sendMsg(user_info); // 调用 sendMsg 函数发送登录信息
return;
case 'login':
case 'logout':
user_name = msg.content;
name_list = msg.user_list;
change_type = msg.type;
dealUser(user_name, change_type, name_list); // 更新在线用户列表
return;
}
var data = sender + msg.content; // 拼接消息内容
listMsg(data); // 调用 listMsg 函数将消息显示在聊天界面
};
ws.onerror = function () {
var data = "系统消息 : 出错了,请退出重试."; // 如果连接出错,显示错误消息
listMsg(data); // 调用 listMsg 函数将错误消息显示在聊天界面
};
// 确认函数,用户按下回车键时,调用 send 函数发送消息
function confirm(event) {
var key_num = event.keyCode;
if (13 == key_num) {
send(); // 按下回车时调用 send 函数
} else {
return false;
}
}
// 发送消息的函数
function send() {
var msg_box = document.getElementById("msg_box"); // 获取消息输入框
var content = msg_box.value; // 获取输入框中的内容
var reg = new RegExp("\r\n", "g"); // 去掉回车换行符
content = content.replace(reg, "");
var msg = {'content': content.trim(), 'type': 'user'}; // 构造消息对象
sendMsg(msg); // 调用 sendMsg 函数发送消息
msg_box.value = ''; // 清空消息输入框
}
// 将消息添加到消息列表中
function listMsg(data) {
var msg_list = document.getElementById("msg_list"); // 获取消息列表元素
var msg = document.createElement("p"); // 创建新的 p 元素显示消息
msg.innerHTML = data; // 设置消息内容
msg_list.appendChild(msg); // 将消息添加到消息列表中
msg_list.scrollTop = msg_list.scrollHeight; // 滚动到消息列表的底部
}
// 更新在线用户列表和当前在线人数
function dealUser(user_name, type, name_list) {
var user_list = document.getElementById("user_list"); // 获取用户列表元素
var user_num = document.getElementById("user_num"); // 获取在线人数显示元素
while(user_list.hasChildNodes()) {
user_list.removeChild(user_list.firstChild); // 清空现有用户列表
}
// 遍历用户列表,显示每个用户
for (var index in name_list) {
var user = document.createElement("p");
user.innerHTML = name_list[index];
user_list.appendChild(user);
}
user_num.innerHTML = name_list.length; // 更新当前在线人数
user_list.scrollTop = user_list.scrollHeight; // 滚动到用户列表的底部
var change = type == 'login' ? '上线' : '下线'; // 判断用户是上线还是下线
var data = '系统消息: ' + user_name + ' 已' + change; // 构造上线/下线消息
listMsg(data); // 将上线/下线消息显示在聊天界面
}
// 发送消息到 WebSocket 服务器
function sendMsg(msg) {
var data = JSON.stringify(msg); // 将消息对象转为 JSON 字符串
ws.send(data); // 通过 WebSocket 发送消息
}
// 生成唯一的 UUID,保证每个用户的用户名是唯一的
function uuid(len, radix) {
var chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
var uuid = [], i;
radix = radix || chars.length;
if (len) {
for (i = 0; i < len; i++) uuid[i] = chars[0 | Math.random() * radix];
} else {
var r;
uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
uuid[14] = '4';
for (i = 0; i < 36; i++) {
if (!uuid[i]) {
r = 0 | Math.random() * 16;
uuid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
}
}
}
return uuid.join(''); // 返回生成的唯一 UUID
}
</script>
加上前端代码就很好理解了。
前端客户端发送数据包等,服务器端进行对应的reader,然后如果有c.sc,那么就会writer写给客户端(前端)的ws。
标签:群聊,hub,user,Go,WebSocket,data,连接,消息 From: https://blog.csdn.net/theaipower/article/details/145147219