服务端
工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。
以下是服务端代码
1 public class MsgServerSchedule 2 { 3 4 5 Socket serverSocket; 6 public Action<List<string>> onClientAddDel; 7 public Action<Telegram_Base> onReceive; 8 bool _isRunning = false; 9 10 11 int port; 12 13 public TelgramType telType; 14 15 static List<EndpointClient> clients; 16 17 public bool isRunning { get { return _isRunning; } } 18 public MsgServerSchedule(int _port) 19 { 20 //any 就决定了 ip地址格式是v4 21 //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654); 22 //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 23 24 this.port = _port; 25 26 clients = new List<EndpointClient>(); 27 28 Console.WriteLine("constructor"); 29 30 } 31 32 public void Start() 33 { 34 try 35 { 36 IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port); 37 serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 38 serverSocket.Bind(endPoint); 39 serverSocket.Listen(port); 40 41 serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket); 42 43 _isRunning = true; 44 Console.WriteLine("start"); 45 } 46 catch (Exception ex) 47 { 48 _isRunning = false; 49 serverSocket = null; 50 51 Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port); 52 Console.WriteLine(ex.Message); 53 } 54 55 } 56 57 public void Stop() 58 { 59 for (int i = 0; i < clients.Count; i++) 60 { 61 clients[i].Close(); 62 } 63 ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll); 64 serverSocket.Close(); 65 _isRunning = false; 66 } 67 68 public void SendToAll(Telegram_Base tel) 69 { 70 for (int i = 0; i < clients.Count; i++) 71 { 72 clients[i].Send(tel); 73 } 74 } 75 76 public void SendToSomeOne(Telegram_Base tel) 77 { 78 for (int i = 0; i < clients.Count; i++) 79 { 80 if(clients[i].remoteIPPort==tel.remoteIPPort) 81 { 82 clients[i].Send(tel); 83 break; 84 } 85 } 86 } 87 88 //新增与删除客户端 秉持原子操作 89 List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType) 90 { 91 //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错 92 lock (this) 93 { 94 if (changeType == EndPointClientsChangeType.Add) 95 { 96 clients.Add(cli); 97 } 98 else if(changeType== EndPointClientsChangeType.Del) 99 { 100 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort); 101 if (beRemoveClient != null) 102 clients.Remove(beRemoveClient); 103 } 104 else if(changeType== EndPointClientsChangeType.ClearAll) 105 { 106 clients.Clear(); 107 } 108 else if (changeType == EndPointClientsChangeType.GetAll) 109 { 110 List<string> onLines = new List<string>(clients.Count); 111 for (int i = 0; i < clients.Count; i++) 112 { 113 onLines.Add(clients[i].remoteIPPort); 114 } 115 return onLines; 116 } 117 else 118 { 119 return null; 120 } 121 } 122 return null; 123 } 124 //异步监听客户端 有客户端到来时的回调 125 private void AcceptCallback(IAsyncResult iar) 126 { 127 //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出) 128 //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出 129 //由此得出 处于receive才会有掉线感知 ,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的 130 //虽然tcp是所谓的长连接 ,通过反复测试 ->但是双方相互都处在一个静止状态 是无法 确定在不在的 131 //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。 132 //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket 133 //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要 134 //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧 135 136 //tcp 不会丢包 哪怕是连接建立了 你还没开始receive 对方却先发了, 137 //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到 138 139 //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性 140 //tcp的包顺序能够保证 先发的先到 141 142 //nures代码中那些beginreceivexxx 异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组 143 //真正endreceive的时候 其实数据已经接收 处理完成了 144 145 try 146 { 147 148 //处理完当前accept 149 Socket currentSocket = serverSocket.EndAccept(iar); 150 151 EndpointClient client = new EndpointClient(currentSocket,telType); 152 153 //新增客户端 154 ClientAddDelGetList(client, EndPointClientsChangeType.Add); 155 156 if (onClientAddDel != null) 157 { 158 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll); 159 onClientAddDel(onlines); 160 161 //客户端异常掉线 162 client.onClientDel = new Action<string>((_remoteIPPort) => 163 { 164 ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del); 165 166 List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll); 167 onClientAddDel(onlines2); 168 }); 169 } 170 171 172 173 //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大 174 //总是从0开始(就是说并发时会覆盖 175 176 Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString())); 177 178 //currentSocket.Close(); 179 //Application.Exit(); 180 181 //Thread.Sleep(1000 * 10); 182 client.onReceive += this.onReceive; 183 184 client.BeginReceive(); 185 186 187 //立即开始accept新的客户端 188 if (isRunning == true && serverSocket != null) 189 serverSocket.BeginAccept(AcceptCallback, serverSocket); 190 //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程 191 //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转 192 193 //加asynccallback 有什么不一样么 194 //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket); 195 196 } 197 catch (Exception ex) 198 { 199 Console.WriteLine("AcceptCallback Error"); 200 Console.WriteLine(ex.Message); 201 } 202 203 } 204 205 206 }
EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路
以下EndpointClient代码
1 public class EndpointClient 2 { 3 Socket workingSocket; 4 static int receiveBufferLenMax = 5000; 5 byte[] onceReadDatas = new byte[receiveBufferLenMax]; 6 List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax); 7 8 public string remoteIPPort { get; set; } 9 10 //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度 11 int receiveBufferLen = 0; 12 13 14 TelgramType telType; 15 16 public Action<Telegram_Base> onReceive; 17 public Action<string> onClientDel; 18 19 public EndpointClient() 20 { 21 22 } 23 public EndpointClient(Socket _socket,TelgramType _telType) 24 { 25 this.remoteIPPort = _socket.RemoteEndPoint.ToString(); 26 this.telType = _telType; 27 workingSocket = _socket; 28 } 29 30 public void Send(Telegram_Base tel) 31 { 32 //try 33 //{ 34 if(workingSocket==null) 35 { 36 Console.WriteLine("未初始化的EndpointClient"); 37 return; 38 } 39 if (tel is Telegram_Schedule) 40 { 41 Telegram_Schedule telBeSend = tel as Telegram_Schedule; 42 if (telBeSend.dataBytes.Length != telBeSend.dataLen) 43 { 44 Console.WriteLine("尝试发送数据长度格式错误的报文"); 45 return; 46 } 47 48 byte[] sendBytesHeader = telBeSend.dataBytesHeader; 49 byte[] sendbytes = telBeSend.dataBytes; 50 51 //数据超过缓冲区长度 会导致无法拆包 52 if (sendbytes.Length <= receiveBufferLenMax) 53 { 54 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null); 55 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null 56 57 ); 58 } 59 else 60 { 61 Console.WriteLine("发送到调度客户端的数据超过缓冲区长度"); 62 throw new Exception("发送到调度客户端的数据超过缓冲区长度"); 63 } 64 65 } 66 else if (tel is Telegram_SDBMsg) 67 { 68 69 } 70 71 //} 72 //catch (Exception ex) 73 //{ 74 75 // Console.WriteLine(ex.Message); 76 // throw ex; 77 //} 78 } 79 80 public void BeginReceive() 81 { 82 if (workingSocket == null) 83 { 84 Console.WriteLine("未初始化的EndpointClient"); 85 return; 86 } 87 88 receiveBufferLen = 0; 89 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None, 90 ReceiveCallback, 91 this); 92 } 93 private void ReceiveCallback(IAsyncResult iar) 94 { 95 try 96 { 97 EndpointClient cli = (EndpointClient)iar.AsyncState; 98 Socket socket = cli.workingSocket; 99 int reads = socket.EndReceive(iar); 100 101 if (reads > 0) 102 { 103 104 for (int i = 0; i < reads; i++) 105 { 106 receiveBuffer.Add(onceReadDatas[i]); 107 } 108 109 //具体填充了多少看返回值 此时 数据已经在buffer中了 110 receiveBufferLen += reads; 111 //加完了后解析 阻塞式处理 结束后开启新的接收 112 SloveTelData(); 113 114 if (receiveBufferLenMax - receiveBufferLen > 0) 115 { 116 //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收) 117 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this); 118 } 119 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了 120 { 121 Close(); 122 //移除自己 123 if (onClientDel != null) 124 { 125 onClientDel(remoteIPPort); 126 } 127 Console.WriteLine("服务端接口解析数据出现异常"); 128 throw new Exception("服务端接口解析数据出现异常"); 129 } 130 } 131 else//reads==0 客户端已关闭 132 { 133 Close(); 134 //移除自己 135 if (onClientDel != null) 136 { 137 onClientDel(remoteIPPort); 138 } 139 } 140 } 141 catch (Exception ex) 142 { 143 Close(); 144 //移除自己 145 if (onClientDel != null) 146 { 147 onClientDel(remoteIPPort); 148 } 149 150 Console.WriteLine("ReceiveCallback Error"); 151 Console.WriteLine(ex.Message); 152 } 153 154 } 155 void SloveTelData() 156 { 157 //进行数据解析 158 SloveTelDataUtil slo = new SloveTelDataUtil(); 159 160 if (telType == TelgramType.Schedule) 161 { 162 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort); 163 //buffer已经被处理一遍了 使用新的长度 164 receiveBufferLen = receiveBuffer.Count; 165 //解析出的每一个对象都触发 onreceive 166 for (int i = 0; i < dataEntitys.Count; i++) 167 { 168 if (onReceive != null) 169 onReceive(dataEntitys[i]); 170 } 171 } 172 else if (telType == TelgramType.SDBMsg) 173 { 174 175 } 176 177 } 178 179 180 public void Close() 181 { 182 try 183 { 184 receiveBuffer.Clear(); 185 receiveBufferLen = 0; 186 if (workingSocket != null && workingSocket.Connected) 187 workingSocket.Close(); 188 } 189 catch (Exception ex) 190 { 191 Console.WriteLine(ex.Message); 192 } 193 194 } 195 }
数据拆包与封包粘包处理
上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用
以下是粘包处理逻辑
1 public class SloveTelDataUtil 2 { 3 List<Telegram_Schedule> solveList; 4 public SloveTelDataUtil() 5 { 6 } 7 8 List<byte> buffer; 9 int bufferLen; 10 int bufferIndex = 0; 11 string remoteIPPort; 12 public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort) 13 { 14 15 solveList = new List<Telegram_Schedule>(); 16 17 bufferIndex = 0; 18 19 buffer = _buffer; 20 bufferLen = _bufferLen; 21 remoteIPPort = _remoteIPPort; 22 23 //小于最小长度 直接返回 24 if (bufferLen < 12) 25 return solveList; 26 27 //进行数据解析 28 bool anaysisOK = false; 29 while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止 30 { 31 } 32 return solveList; 33 } 34 35 public bool AnaysisData_Schedule() 36 { 37 if (bufferLen - bufferIndex < GlobalSymbol.Headerlen) 38 return false; 39 40 //解析出一个数据对象 41 Telegram_Schedule telObj = new Telegram_Schedule(); 42 43 //必定是大于最小数据大小的 44 telObj.dataBytesHeader = new byte[GlobalSymbol.Headerlen]; 45 buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen); 46 47 byte[] btsHeader = new byte[4]; 48 byte[] btsCommand = new byte[4]; 49 byte[] btsLen = new byte[4]; 50 51 btsHeader[0] = buffer[bufferIndex]; 52 btsHeader[1] = buffer[bufferIndex+1]; 53 btsHeader[2] = buffer[bufferIndex+2]; 54 btsHeader[3] = buffer[bufferIndex+3]; 55 56 bufferIndex += 4; 57 58 btsCommand[0] = buffer[bufferIndex]; 59 btsCommand[1] = buffer[bufferIndex + 1]; 60 btsCommand[2] = buffer[bufferIndex + 2]; 61 btsCommand[3] = buffer[bufferIndex + 3]; 62 63 bufferIndex += 4; 64 65 btsLen[0] = buffer[bufferIndex]; 66 btsLen[1] = buffer[bufferIndex + 1]; 67 btsLen[2] = buffer[bufferIndex + 2]; 68 btsLen[3] = buffer[bufferIndex + 3]; 69 70 bufferIndex += 4; 71 72 73 74 int dataLen = BitConverter.ToInt32(btsLen, 0); 75 telObj.header = BitConverter.ToUInt32(btsHeader, 0); 76 telObj.command = BitConverter.ToInt32(btsCommand, 0); 77 telObj.remoteIPPort = remoteIPPort; 78 79 if(dataLen>0) 80 { 81 //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理 82 //下次来了连着头一起解析 83 if (bufferLen - GlobalSymbol.Headerlen < dataLen) 84 { 85 86 bufferIndex -= 12;// 87 88 89 return false; 90 91 } 92 else 93 { 94 95 telObj.dataLen = dataLen; 96 telObj.dataBytes = new byte[dataLen]; 97 buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen); 98 99 solveList.Add(telObj); 100 //bufferIndex += dataLen; 101 102 //解析成功一次 移除已解析的 103 for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++) 104 { 105 buffer.RemoveAt(0); 106 } 107 bufferIndex = 0; 108 bufferLen = buffer.Count; 109 return true; 110 } 111 } 112 else 113 { 114 115 telObj.dataLen = 0; 116 solveList.Add(telObj); 117 //bufferIndex += 0; 118 //解析成功一次 移除已解析的 119 for (int i = 0; i < GlobalSymbol.Headerlen; i++) 120 { 121 buffer.RemoveAt(0); 122 } 123 //解析成功一次因为移除了缓冲区 bufferIndex置0 124 bufferIndex = 0; 125 bufferLen = buffer.Count; 126 return true; 127 } 128 129 } 130 131 132 public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer) 133 { 134 return new List<Telegram_SDBMsg>(); 135 } 136 }
我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。
以下是数据包结构代码
1 public class Telegram_Base 2 { 3 public string remoteIPPort { get; set; } 4 //数据内容 5 public byte[] dataBytes { get; set; } 6 //头部内容的序列化 7 public byte[] dataBytesHeader { get; set; } 8 9 public string jsonStr { get; set; } 10 virtual public void SerialToBytes() 11 { 12 13 } 14 15 virtual public void SloveToTel() 16 { 17 18 } 19 20 } 21 22 public class Telegram_Schedule:Telegram_Base 23 { 24 25 //头部标识 4字节 26 public UInt32 header { get; set; } 27 //命令对应枚举的 int 4字节 28 public int command { get; set; } 29 //数据长度 4字节 30 public int dataLen { get; set; } 31 32 33 34 override public void SerialToBytes() 35 { 36 //有字符串数据 但是待发送字节是空 37 if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0) 38 { 39 dataBytes = Encoding.UTF8.GetBytes(jsonStr); 40 dataLen = dataBytes.Length; 41 dataBytesHeader = new byte[GlobalSymbol.Headerlen]; 42 43 header = GlobalSymbol.HeaderSymbol; 44 45 byte[] btsHeader = BitConverter.GetBytes(header); 46 byte[] btsCommand = BitConverter.GetBytes(command); 47 byte[] btsLen = BitConverter.GetBytes(dataLen); 48 49 Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4); 50 Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4); 51 Array.Copy(btsLen, 0, dataBytesHeader, 8, 4); 52 53 } 54 else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){ 55 dataLen = 0; 56 dataBytes = new byte[0]; 57 58 dataBytesHeader = new byte[GlobalSymbol.Headerlen]; 59 60 header = GlobalSymbol.HeaderSymbol; 61 62 byte[] btsHeader = BitConverter.GetBytes(header); 63 byte[] btsCommand = BitConverter.GetBytes(command); 64 byte[] btsLen = BitConverter.GetBytes(dataLen); 65 66 Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4); 67 Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4); 68 Array.Copy(btsLen, 0, dataBytesHeader, 8, 4); 69 } 70 } 71 72 override public void SloveToTel() 73 { 74 //只解析字符串数据部分 ,header 和len 在接收之初就已解析 75 this.jsonStr = Encoding.UTF8.GetString(this.dataBytes); 76 } 77 78 }
客户端代码
最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致
1 public class MsgClientSchedule 2 { 3 Socket workingSocket; 4 //缓冲区最大数据长度 5 static int receiveBufferLenMax = 5000; 6 //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度 7 byte[] onceReadDatas = new byte[receiveBufferLenMax]; 8 //未解析到完整数据包时的残余数据保存区 9 List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax); 10 11 string serverIP { get; set; } 12 int serverPort { get; set; } 13 public string localIPPort { get; set; } 14 15 //残余缓冲区数据长度 16 int receiveBufferLen = 0; 17 18 bool _isConnected { get; set; } 19 20 TelgramType telType; 21 22 //收一个包时触发 23 public Action<Telegram_Base> onReceive; 24 //与服务端断链时触发 25 public Action<string> onClientDel; 26 27 28 public bool isConnected { get { return _isConnected; } } 29 public MsgClientSchedule(string _serverIP,int _port) 30 { 31 serverIP = _serverIP; 32 serverPort = _port; 33 _isConnected = false; 34 } 35 36 public void Connect() 37 { 38 try 39 { 40 workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP); 41 IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort); 42 workingSocket.Connect(ipport); 43 44 localIPPort = workingSocket.LocalEndPoint.ToString(); 45 _isConnected = true; 46 BeginReceive(); 47 } 48 catch (Exception ex) 49 { 50 workingSocket = null; 51 _isConnected = false; 52 53 Console.WriteLine(ex.Message); 54 } 55 56 } 57 58 59 60 61 public void Send(Telegram_Base tel) 62 { 63 try 64 { 65 if(_isConnected==false) 66 { 67 Console.WriteLine("未连接到服务器"); 68 return; 69 } 70 if (tel is Telegram_Schedule) 71 { 72 Telegram_Schedule telBeSend = tel as Telegram_Schedule; 73 if (telBeSend.dataBytes.Length != telBeSend.dataLen) 74 { 75 Console.WriteLine("尝试发送数据长度格式错误的报文"); 76 return; 77 } 78 byte[] sendBytesHeader = telBeSend.dataBytesHeader; 79 byte[] sendbytes = telBeSend.dataBytes; 80 81 //数据超过缓冲区长度 会导致无法拆包 82 if (sendbytes.Length <= receiveBufferLenMax) 83 { 84 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null); 85 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null 86 87 ); 88 } 89 else 90 { 91 Console.WriteLine("发送到调度客户端的数据超过缓冲区长度"); 92 throw new Exception("发送到调度客户端的数据超过缓冲区长度"); 93 } 94 95 96 } 97 else if (tel is Telegram_SDBMsg) 98 { 99 100 } 101 102 } 103 catch (Exception ex) 104 { 105 Close(); 106 Console.WriteLine(ex.Message); 107 //throw ex; 108 } 109 } 110 111 public void BeginReceive() 112 { 113 receiveBufferLen = 0; 114 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None, 115 ReceiveCallback, 116 117 this); 118 } 119 private void ReceiveCallback(IAsyncResult iar) 120 { 121 try 122 { 123 MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState; 124 Socket socket = cli.workingSocket; 125 int reads = socket.EndReceive(iar); 126 127 if (reads > 0) 128 { 129 130 for (int i = 0; i < reads; i++) 131 { 132 receiveBuffer.Add(onceReadDatas[i]); 133 } 134 135 //具体填充了多少看返回值 此时 数据已经在buffer中了 136 137 receiveBufferLen += reads; 138 139 //加完了后解析 阻塞式处理 结束后开启新的接收 140 SloveTelData(); 141 142 143 144 if (receiveBufferLenMax - receiveBufferLen > 0) 145 { 146 //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收) 147 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this); 148 } 149 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了 150 { 151 Close(); 152 153 Console.WriteLine("服务端接口解析数据出现异常"); 154 throw new Exception("服务端接口解析数据出现异常"); 155 } 156 } 157 else//reads==0客户端已关闭 158 { 159 Close(); 160 } 161 } 162 catch (Exception ex) 163 { 164 Close(); 165 166 Console.WriteLine("ReceiveCallback Error"); 167 Console.WriteLine(ex.Message); 168 } 169 170 } 171 private void SloveTelData() 172 { 173 174 //进行数据解析 175 SloveTelDataUtil slo = new SloveTelDataUtil(); 176 177 if (telType == TelgramType.Schedule) 178 { 179 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString()); 180 //buffer已经被处理一遍了 使用新的长度 181 receiveBufferLen = receiveBuffer.Count; 182 //解析出的每一个对象都触发 onreceive 183 for (int i = 0; i < dataEntitys.Count; i++) 184 { 185 if (onReceive != null) 186 onReceive(dataEntitys[i]); 187 } 188 } 189 else if (telType == TelgramType.SDBMsg) 190 { 191 192 } 193 194 } 195 196 197 public void Close() 198 { 199 try 200 { 201 _isConnected = false; 202 203 receiveBuffer.Clear(); 204 receiveBufferLen = 0; 205 if (workingSocket != null && workingSocket.Connected) 206 workingSocket.Close(); 207 } 208 catch (Exception ex) 209 { 210 Console.WriteLine(ex.Message); 211 } 212 213 } 214 215 }
服务端调用
构建一个winform基本项目
1 List<string> clients; 2 MsgServerSchedule server; 3 private void btn_start_Click(object sender, EventArgs e) 4 { 5 server = new MsgServerSchedule(int.Parse(tbx_port.Text)); 6 7 8 server.Start(); 9 if (server.isRunning == true) 10 { 11 btn_start.Enabled = false; 12 13 server.onReceive += new Action<Telegram_Base>( 14 (tel) => 15 { 16 this.BeginInvoke(new Action(() => 17 { 18 if (tel is Telegram_Schedule) 19 { 20 Telegram_Schedule ts = tel as Telegram_Schedule; 21 ts.SloveToTel(); 22 Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString())); 23 24 tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "\r\n"; 25 26 //数据回发测试 27 string fromip = ts.remoteIPPort; 28 string srcMsg = ts.jsonStr; 29 string fromServerMsg = ts.jsonStr + " -from server"; 30 ts.jsonStr = fromServerMsg; 31 32 33 //如果消息里有指向信息 则转送到对应的客户端 34 if (clients != null) 35 { 36 string to = null; 37 for (int i = 0; i < clients.Count; i++) 38 { 39 if (srcMsg.Contains(clients[i])) 40 { 41 to = clients[i]; 42 break; 43 } 44 } 45 46 if (to != null) 47 { 48 ts.remoteIPPort = to; 49 string toMsg; 50 //toMsg= srcMsg.Replace(to, ""); 51 toMsg = srcMsg.Replace(to, fromip); 52 ts.jsonStr = toMsg; 53 ts.SerialToBytes(); 54 55 server.SendToSomeOne(ts); 56 } 57 else 58 { 59 ts.SerialToBytes(); 60 server.SendToSomeOne(ts); 61 } 62 } 63 } 64 })); 65 66 } 67 ); 68 69 server.onClientAddDel += new Action<List<string>>((onlines) => 70 { 71 this.BeginInvoke( 72 new Action(() => 73 { 74 clients = onlines; 75 listbox_clients.Items.Clear(); 76 77 for (int i = 0; i < onlines.Count; i++) 78 { 79 listbox_clients.Items.Add(onlines[i]); 80 } 81 })); 82 }); 83 } 84 } 85 private void btn_sendAll_Click(object sender, EventArgs e) 86 { 87 Telegram_Schedule tel = new Telegram_Schedule(); 88 tel.header = GlobalSymbol.HeaderSymbol; 89 tel.command = (int)ScheduleTelCommandType.StartC2S; 90 tel.jsonStr = tbx_sendAll.Text; 91 tel.SerialToBytes(); 92 93 server.SendToAll(tel); 94 }
客户端调用
1 MsgClientSchedule client; 2 3 private void btn_start_Click(object sender, EventArgs e) 4 { 5 client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text)); 6 7 client.Connect(); 8 9 if (client.isConnected == true) 10 { 11 btn_start.Enabled = false; 12 13 label1.Text = client.localIPPort; 14 15 client.onReceive = new Action<Telegram_Base>((tel) => 16 { 17 this.BeginInvoke( 18 new Action(() => 19 { 20 tel.SloveToTel(); 21 tbx_rec.Text += tel.jsonStr + "\r\n"; 22 23 })); 24 }); 25 } 26 27 } 28 29 30 31 private void btn_send_Click(object sender, EventArgs e) 32 { 33 34 if (client == null || client.isConnected == false) 35 return; 36 37 //for (int i = 0; i < 2; i++) 38 //{ 39 Telegram_Schedule tel = new Telegram_Schedule(); 40 tel.command = (int)ScheduleTelCommandType.MsgC2S; 41 42 tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text; 43 tel.SerialToBytes();//发出前要先序列化 44 45 client.Send(tel); 46 //} 47 48 }
实现效果
可以多客户端连接互相自由发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了
标签:tel,socket,int,clients,通信,简易,new,byte,public From: https://www.cnblogs.com/assassinx/p/17154167.html