Server
internal class Program
{
static async Task Main(string[] args)
{
var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5000);
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
socket.Bind(endpoint);
socket.Listen(100);
var e = new SocketAsyncEventArgs();
e.UserToken = socket;
e.Completed += Args_Completed!;
if (!socket.AcceptAsync(e))
{
OnAccepted(e);
}
Console.ReadKey();
}
private static void Args_Completed(object sender, SocketAsyncEventArgs e)
{
OnAccepted(e);
}
private static void OnAccepted(SocketAsyncEventArgs e)
{
if (e.LastOperation == SocketAsyncOperation.Accept &&
e.SocketError == SocketError.Success &&
e.AcceptSocket != null)
{
// 处理新连接
Console.WriteLine("接收到新连接");
Task.Factory.StartNew(ProcessLinesAsync!, e.AcceptSocket);
}
// 监听下一连接
e.AcceptSocket = null;
try
{
if (!((Socket)e.UserToken!).AcceptAsync(e))
{
OnAccepted(e);
}
}
catch (Exception ex)
{
// 如果处于运行状态,记录日志
Console.WriteLine(ex.ToString());
// 销毁实例
}
}
private static async Task ProcessLinesAsync(object state)
{
var socket = (Socket)state;
// 开启管道,一边读,一边写
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
private static async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
Console.WriteLine(Encoding.UTF8.GetString(line));
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
async static Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 10;
while (true)
{
// 分配内存
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
var bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// 告诉PipeWriter已经写入了多少字节
writer.Advance(bytesRead);
}
catch (Exception ex)
{
Console.WriteLine(ex);
break;
}
// 标记写入完成
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
await writer.CompleteAsync();
}
}
Client
internal class Program
{
private static Stream stream;
static async Task Main(string[] args)
{
var client = new TcpClient();
await client.ConnectAsync("localhost", 5000);
var stream = client.GetStream();
await ProcessLinesAsync(stream);
Console.ReadKey();
}
async static Task ProcessLinesAsync(NetworkStream stream)
{
while (true)
{
var str = "一个商户可以关联多个小程序,一个小程序也可以绑定多个商户。对应日常开发,在多商户的情况,每一个商户号在后台都会进行配置,存入数据库,支付时根据当前不同的账户决定使用那一个商户号,这一块需要后台对每一个AppId和商户号做关联,支付时才能确认具体使用那个商户号。Hello world\n";
var bytes = Encoding.UTF8.GetBytes(str);
await stream.WriteAsync(bytes, 0, bytes.Length);
Thread.Sleep(2000);
}
}
}
标签:Task,socket,buffer,await,static,var
From: https://www.cnblogs.com/readafterme/p/18424096