服务端
开启websocket中间件
// 需要UseWebSockets,否则无法使用WebSocket
app.UseWebSockets(new WebSocketOptions()
{
KeepAliveInterval = TimeSpan.FromSeconds(60),
});
// 处理websocket的中间件
app.UseMiddleware<WebsocketMiddlware>();
WebsocketMiddlware
public class WebsocketMiddlware
{
private RequestDelegate _next;
private const int DefaultBufferSize = 2 * 1024;
private const int MaxMessageSize = 10 * 1024;
public WebsocketMiddlware(RequestDelegate next)
{
_next = next;
}
public async Task InvokeAsync(HttpContext context)
{
// 处理WebSocket请求
if (context.Request.Path == "/ws")
{
if (context.WebSockets.IsWebSocketRequest)
{
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
bool closedReceived = false;
while (!closedReceived)
{
var incomingMessage = await WebSocketMessageReader.ReadMessageAsync(webSocket, DefaultBufferSize, MaxMessageSize);
switch (incomingMessage.MessageType)
{
case WebSocketMessageType.Text:
await SendMessage(webSocket, incomingMessage.Data.ToString()!);
break;
case WebSocketMessageType.Binary:
throw new Exception("不支持的类型");
default:
closedReceived = true;
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
webSocket.Dispose();
break;
}
}
}
else
{
context.Response.StatusCode = 400;
}
}
else
{
await _next(context);
}
}
private async Task SendMessage(WebSocket webSocket, string message)
{
var data = Encoding.UTF8.GetBytes(message);
await webSocket.SendAsync(new ArraySegment<byte>(data, 0, data.Length), WebSocketMessageType.Text, true,
CancellationToken.None);
}
}
public class WebSocketMessageReader
{
private static readonly ArraySegment<byte> _emptyArraySegment = new ArraySegment<byte>(new byte[0]);
public static async Task<WebSocketMessage> ReadMessageAsync(WebSocket webSocket, int bufferSize, int? maxMessageSize)
{
WebSocketMessage message;
// 第一次读取:读取出结束标志
WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(_emptyArraySegment, CancellationToken.None);
if (TryGetMessage(receiveResult, null, out message))
{
return message;
}
// 能够一次性读取完成的情况
var buffer = new byte[bufferSize];
var arraySegment = new ArraySegment<byte>(buffer);
receiveResult = await webSocket.ReceiveAsync(arraySegment, CancellationToken.None);
if (TryGetMessage(receiveResult, buffer, out message))
{
return message;
}
// 需要多次读取的情况
ByteBuffer bytebuffer = new ByteBuffer(maxMessageSize);
bytebuffer.Append(BufferSliceToByteArray(buffer, receiveResult.Count));
WebSocketMessageType originalMessageType = receiveResult.MessageType;
while (true)
{
// loop until an error occurs or we see EOF
receiveResult = await webSocket.ReceiveAsync(arraySegment, CancellationToken.None);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return WebSocketMessage.CloseMessage;
}
if (receiveResult.MessageType != originalMessageType)
{
throw new InvalidOperationException("Incorrect message type");
}
bytebuffer.Append(BufferSliceToByteArray(buffer, receiveResult.Count));
if (receiveResult.EndOfMessage)
{
switch (receiveResult.MessageType)
{
case WebSocketMessageType.Binary:
return new WebSocketMessage(bytebuffer.GetByteArray(), WebSocketMessageType.Binary);
case WebSocketMessageType.Text:
return new WebSocketMessage(bytebuffer.GetString(), WebSocketMessageType.Text);
default:
throw new InvalidOperationException("Unknown message type");
}
}
}
}
private static byte[] BufferSliceToByteArray(byte[] buffer, int count)
{
byte[] newArray = new byte[count];
Buffer.BlockCopy(buffer, 0, newArray, 0, count);
return newArray;
}
private static string BufferSliceToString(byte[] buffer, int count)
{
return Encoding.UTF8.GetString(buffer, 0, count);
}
private static bool TryGetMessage(WebSocketReceiveResult receiveResult, byte[] buffer, out WebSocketMessage message)
{
message = null;
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
message = WebSocketMessage.CloseMessage;
}
else if (receiveResult.EndOfMessage)
{
// we anticipate that single-fragment messages will be common, so we optimize for them
switch (receiveResult.MessageType)
{
case WebSocketMessageType.Binary:
if (buffer == null)
{
message = WebSocketMessage.EmptyBinaryMessage;
}
else
{
message = new WebSocketMessage(BufferSliceToByteArray(buffer, receiveResult.Count), WebSocketMessageType.Binary);
}
break;
case WebSocketMessageType.Text:
if (buffer == null)
{
message = WebSocketMessage.EmptyTextMessage;
}
else
{
message = new WebSocketMessage(BufferSliceToString(buffer, receiveResult.Count), WebSocketMessageType.Text);
}
break;
default:
throw new InvalidOperationException("Unknown message type");
}
}
return message != null;
}
}
public sealed class ByteBuffer
{
private int _currentLength;
private readonly int? _maxLength;
private readonly List<byte[]> _segments = new List<byte[]>();
public ByteBuffer(int? maxLength)
{
_maxLength = maxLength;
}
public void Append(byte[] segment)
{
checked { _currentLength += segment.Length; }
if (_maxLength.HasValue && _currentLength > _maxLength)
{
throw new InvalidOperationException("Buffer length exceeded");
}
_segments.Add(segment);
}
// returns the segments as a single byte array
public byte[] GetByteArray()
{
byte[] newArray = new byte[_currentLength];
int lastOffset = 0;
for (int i = 0; i < _segments.Count; i++)
{
byte[] thisSegment = _segments[i];
Buffer.BlockCopy(thisSegment, 0, newArray, lastOffset, thisSegment.Length);
lastOffset += thisSegment.Length;
}
return newArray;
}
// treats the segments as UTF8-encoded information and returns the resulting string
public string GetString()
{
StringBuilder builder = new StringBuilder();
Decoder decoder = Encoding.UTF8.GetDecoder();
for (int i = 0; i < _segments.Count; i++)
{
bool flush = (i == _segments.Count - 1);
byte[] thisSegment = _segments[i];
int charsRequired = decoder.GetCharCount(thisSegment, 0, thisSegment.Length, flush);
char[] thisSegmentAsChars = new char[charsRequired];
int numCharsConverted = decoder.GetChars(thisSegment, 0, thisSegment.Length, thisSegmentAsChars, 0, flush);
builder.Append(thisSegmentAsChars, 0, numCharsConverted);
}
return builder.ToString();
}
}
public class WebSocketMessage
{
public readonly object Data;
public readonly WebSocketMessageType MessageType;
public static readonly WebSocketMessage EmptyTextMessage = new WebSocketMessage(String.Empty, WebSocketMessageType.Text);
public static readonly WebSocketMessage EmptyBinaryMessage = new WebSocketMessage(new byte[0], WebSocketMessageType.Binary);
public static readonly WebSocketMessage CloseMessage = new WebSocketMessage(null, WebSocketMessageType.Close);
public WebSocketMessage(object data, WebSocketMessageType messageType)
{
Data = data;
MessageType = messageType;
}
}
客户端
let ws = new WebSocket('wss://localhost:7293/ws') // http对应ws:// https对应wss
// 连接成功后的回调函数
ws.onopen = function (params) {
console.log('客户端连接成功')
// 向服务器发送消息
ws.send('hello')
};
// 从服务器接受到信息时的回调函数
ws.onmessage = function (e) {
console.log('收到服务器响应', e.data)
};
// 连接关闭后的回调函数
ws.onclose = function(evt) {
console.log("关闭客户端连接");
};
// 连接失败后的回调函数
ws.onerror = function (evt) {
console.log("连接失败了");
};
function send(){
let message = document.getElementById('message').value
ws.send(message)
}
标签:WebSocketMessage,WebSocketMessageType,receiveResult,new,message,WebSocket1,publi
From: https://www.cnblogs.com/readafterme/p/18412549