前面实现优先级队列,现在就准备开始实现 HSMS 协议中如何处理 connection。在之前定义的接口中,我们定义了 IConnection
接口,现在我们先来实现这个接口。我们知道, 此软件 既可以作为 Host 端,主动去连接 equipment,也是可以作为 equipment 端,让 其他系统来连接我们的系统。因此,connection 是分为 两种类型的。
但是他们在实现 IConnection
接口 时,有部分内容是相同的,因此我们先实现相同的部分,不相同的部分我们后面在实现。
1、相同的部分
package connection
import (
"secs-gem/hsms/packet"
"secs-gem/hsms/queue"
driver_log "secs-gem/log"
"secs-gem/secsgem"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"net"
"runtime/debug"
"strings"
"sync"
"time"
)
const (
SendBlockSize int = 1024 * 1024 // 每次发送数据的最大字节数
)
/*-----------------------------------
连接
-----------------------------------*/
func NewConnection(cfg *secsgem.ConnectionCfg, delegate secsgem.IDelegate) secsgem.IConnection {
var sc secsgem.IConnection
if cfg.Active {
sc = &HsmsActiveConnection{
HsmsConnection: newHsmsConnection(cfg, delegate),
}
} else {
sc = &HsmsPassiveConnection{
HsmsConnection: newHsmsConnection(cfg, delegate),
}
}
return sc
}
func newHsmsConnection(cfg *secsgem.ConnectionCfg, delegate secsgem.IDelegate) *HsmsConnection {
sc := &HsmsConnection{
view: &secsgem.ConnView{},
delegate: delegate,
recvChan: make(chan *packets.HsmsPacket, 1024),
connected: false,
log: zap.S(),
}
//设置cfg
sc.SetConfig(cfg)
return sc
}
type HsmsConnection struct {
ctx context.Context
ctxCancel context.CancelFunc
config *secsgem.ConnectionCfg
view *secsgem.ConnView
// 建立连接/断开连接的处理过程
delegate secsgem.IDelegate
sock *net.TCPConn
status secsgem.Status
// 缓存
receiveBuffer bytes.Buffer
// 接收chan
recvChan chan *packets.HsmsPacket
// 发送queue
sendQueue *queue.PriorityQueue
// 是否已连接
connected bool
//日志
log *zap.SugaredLogger
mx sync.RWMutex
group errgroup.Group
}
func (sc *HsmsConnection) GetConfig() *secsgem.ConnectionCfg {
return sc.config
}
func (sc *HsmsConnection) SetConfig(cfg *secsgem.ConnectionCfg) {
if sc.view.Enabled.Load() {
//已启用不能修改
return
}
sc.mx.Lock()
defer sc.mx.Unlock()
cfg.ReviseDuration() // 修正精度
sc.config = cfg
// 往日志中添加 port、active 字段
// 后续使用 sc.log 输出的日志都将拥有 With 后面的内容
sc.log = driver_log.S().With("port", cfg.GetPort(), "active", cfg.Active)
}
func (sc *HsmsConnection) disable() error {
if sc.ctx != nil && sc.ctx.Err() == nil {
sc.ctxCancel()
}
//关闭连接
if sc.sock != nil {
sc.sock.Close()
}
// 等待 errgroup 启动的 goroutine 的协程退出
sc.group.Wait()
return nil
}
func (sc *HsmsConnection) GetCtx() context.Context {
return sc.ctx
}
// SendPacket 发送 packet
func (sc *HsmsConnection) SendPacket(packet interface{}, priority int) error {
pkt, ok := packet.(*packets.HsmsPacket)
if !ok {
return fmt.Errorf("packet type error %s", pkt)
}
if !sc.connected {
return fmt.Errorf("disconnected")
}
priority |= secsgem.PriorityNORMAL
//放入queue
systemId := pkt.Header.System
sc.log.Debugf("[system=0x%x] into send_queue", systemId)
sc.sendQueue.PutNoWait(&queue.QueueItem{
Value: pkt,
Priority: priority,
JoinTime: time.Now(),
})
return nil
}
// 发送
func (sc *HsmsConnection) _sendPacket(sock *net.TCPConn, pkt *packets.HsmsPacket) error {
systemId := pkt.Header.System
data := pkt.Encode()
length := len(data)
sc.log.Debugf("[system=0x%x] send packet %s", systemId, pkt)
for i := 0; i < length; i += SendBlockSize {
start := i
end := i + SendBlockSize
if end > length {
end = length
}
block := data[start:end]
retryCount := 5
retry := true
for retry && retryCount > 0 {
retryCount -= 1
sc.log.Debugf("[system=0x%x][send] %v", systemId, block)
//secsgem.MetricHsmsSendTotal.Add(float64(len(block)))
_, err := sock.Write(block)
if err != nil {
errMsg := err.Error()
sc.log.Warn("socket conn send data failed, err: ", errMsg)
if strings.Contains(errMsg, "use of closed network connection") {
return err
}
continue
}
retry = false
}
}
return nil
}
// startReceiverAndSender 开启接受者和发送者
func (sc *HsmsConnection) startReceiverAndSender() {
sc.connected = true
//工作context
_ctx, _ctxCancel := context.WithCancel(sc.ctx)
//接收线程
sc.group.Go(func() error {
defer func() {
if e := recover(); e != nil {
sc.log.Errorf("[panic]%v\n%s", e, debug.Stack())
}
}()
sc.receiverLoop(_ctx, _ctxCancel)
return nil
})
//发送线程
sc.group.Go(func() error {
defer func() {
if e := recover(); e != nil {
sc.log.Errorf("[panic]%v\n%s", e, debug.Stack())
}
}()
sc.senderLoop(_ctx)
return nil
})
//接收消息处理
sc.group.Go(func() error {
sc.handleRevMessageLoop(_ctx)
return nil
})
//连接
if sc.delegate != nil {
sc.delegate.OnConnectionEstablished()
}
}
func (sc *HsmsConnection) receiverLoop(ctx context.Context, cancel context.CancelFunc) {
defer func() {
sc.view.IsReceiving = false
}()
sc.view.IsReceiving = true
if err := sc._receiverReadData(ctx); err != nil {
sc.log.Warnf("receiver error: %s", err)
}
//关闭连接
sc.sock.Close()
//ctx.
cancel()
//通知关闭连接
if sc.delegate != nil {
sc.delegate.OnConnectionClosed()
}
}
//发送线程
func (sc *HsmsConnection) senderLoop(ctx context.Context) {
defer func() {
sc.view.IsSending = false
}()
sc.view.IsSending = true
sock := sc.sock
for ctx.Err() != context.Canceled {
item := sc.sendQueue.Get(ctx, time.Second*5)
if item != nil {
pkt := item.Value
systemId := pkt.Header.System
// 检测超时
waitTime := time.Now().Sub(item.JoinTime)
if waitTime > sc.config.T3 {
sc.log.Debugf("[system=0x%x] send_queue overtime wait_time=%s ", systemId, waitTime)
continue
}
sc.log.Infof("[system=0x%x] send message: %s", systemId, pkt)
err := sc._sendPacket(sock, pkt)
if err != nil {
sc.log.Warnf("[system=0x%x] %s", systemId, err)
}
}
}
sc.log.Debug("sender stopped")
}
func (sc *HsmsConnection) _receiverReadData(ctx context.Context) error {
var buff = make([]byte, 10240)
for ctx.Err() == nil {
n, err := sc.sock.Read(buff)
if err != nil {
sc.connected = false
return err
}
//缓存
data := buff[:n]
sc.log.Debugf("[recv] %v", data)
//secsgem.MetricHsmsReadTotal.Add(float64(len(data)))
sc.receiveBuffer.Write(data)
//处理
for sc._processReceiveBuffer() && ctx.Err() == nil {
}
}
return errors.New("receiver stopped")
}
//处理数据包
func (sc *HsmsConnection) _processReceiveBuffer() bool {
// hsms 协议规定的前四个字节为: Message Length, 因此收到的字节数不能少于 4 个
// 这里有一个问题, 假设 tcp 的包被拆分后, 后面几个字节恰好在下一个包中, 那下一个包就会被丢弃??
// 其实不会的, 因为这里返回 false , 在调用此函数的地方, 是一个 for 循环, 而接收到的数据是存放在 bytes.Buffer 中的
if sc.receiveBuffer.Len() < 4 {
return false
}
//长度, 这里加上 4 的原因是加上了 Message Length 后, 方便后面的计算而已
length := binary.BigEndian.Uint32(sc.receiveBuffer.Bytes()[:4]) + 4
if sc.receiveBuffer.Len() < int(length) { // 未接受完数据, 继续接受
return false
}
data := make([]byte, int(length))
// 从 bytes.Buffer 中读取 length 的数据, 这里一定确保了有 length 的长度的数据
sc.receiveBuffer.Read(data)
//解析, 所以 decode 函数里面, 会先把前 4 个字节去掉
response := packets.Decode(data)
sc.log.Infof("[system=0x%x] recv message: %s", response.Header.System, response)
//放入chan
sc.recvChan <- response
//返回, 如果还有剩余的数据, 继续接受处理
hasData := sc.receiveBuffer.Len() > 0
return hasData
}
//接收消息处理
func (sc *HsmsConnection) handleRevMessageLoop(ctx context.Context) {
defer func() {
sc.view.IsHanding = false
}()
sc.view.IsHanding = true
for ctx.Err() == nil {
select {
case message := <-sc.recvChan: // _processReceiveBuffer 函数中接受数据, 将接收到的数据写入到 sc.recvChan 中
if sc.delegate == nil {
continue
}
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Errorf("[panic] recv handler %v\n%s", e, debug.Stack())
}
}()
sc.delegate.OnConnectionPacketReceived(nil, message)
}()
case <-ctx.Done():
sc.log.Debug("recv message handle stopped")
return
}
}
}
/*-----------------------------------
multi-server
-----------------------------------*/
type HsmsMultiPassiveServer struct {
}
/*-------------------------------------
状态
-------------------------------------*/
func (sc *HsmsConnection) Status() secsgem.Status {
sc.mx.RLock()
defer sc.mx.RUnlock()
return sc.status
}
func (sc *HsmsConnection) isClosed() bool {
return sc.status == secsgem.CLOSED
}
func (sc *HsmsConnection) isConnected() bool {
return sc.status == secsgem.CONNECTED
}
func (sc *HsmsConnection) isDisconnected() bool {
return sc.status == secsgem.DISCONNECTED
}
func (sc *HsmsConnection) isConnecting() bool {
return sc.status == secsgem.CONNECTING
}
func (sc *HsmsConnection) isReconnecting() bool {
return sc.status == secsgem.RECONNECTING
}
2、host mode
此时配置中的 active=true
package connection
import (
"secs-gem/hsms/queue"
"context"
"fmt"
"net"
"time"
)
/*-----------------------------------
active (host mode)
-----------------------------------*/
type HsmsActiveConnection struct {
*HsmsConnection
hTimer *time.Timer
}
func (hac *HsmsActiveConnection) Enable() error {
if hac.view.Enabled.Load() { // 已启动就不能启动了
return nil
}
hac.mx.Lock()
defer hac.mx.Unlock()
hac.ctx, hac.ctxCancel = context.WithCancel(context.Background())
hac.sendQueue = queue.NewPriorityQueue()
hac.view.SQueue = hac.sendQueue.SQueueView
//启动连接
if err := hac.startActiveConnect(); err != nil {
return err
}
// 首次连接成功后, 才会开启 循环重连
if hac.config.AutoReconnect {
if hac.hTimer == nil {
hac.hTimer = time.AfterFunc(hac.config.T5, hac.activeConnCheckTimer)
hac.log.Info("start timing detector")
} else {
hac.hTimer.Reset(hac.config.T5)
}
}
hac.view.Enabled.Store(true)
return nil
}
func (hac *HsmsActiveConnection) Disable() error {
if !hac.view.Enabled.Load() {
return nil
}
hac.mx.Lock()
defer hac.mx.Unlock()
hac.stopCheckTimer() // 停止 hTimer
hac.disable()
hac.view.Enabled.Store(false)
return nil
}
func (hac *HsmsActiveConnection) GetView() interface{} {
hac.mx.RLock()
defer hac.mx.RUnlock()
return hac.view
}
// startActiveConnect 此时配置中的active=true
// 主动连接配置中的 ip:port
func (hac *HsmsActiveConnection) startActiveConnect() error {
if !hac.connected {
//打开连接
address := fmt.Sprintf("%s:%d", hac.config.Host, hac.config.GetPort())
tcpAddr, err := net.ResolveTCPAddr("tcp4", address)
if err != nil {
return fmt.Errorf("start active hsms error: %s", err)
}
if conn, err1 := net.DialTCP("tcp", nil, tcpAddr); err1 != nil {
hac.log.Warn(err1)
return err1
} else {
hac.log.Infof("connected to %s", address)
hac.sock = conn
//消息处理
hac.startReceiverAndSender()
}
}
return nil
}
func (hac *HsmsActiveConnection) activeConnCheckTimer() {
defer hac.hTimer.Reset(hac.config.T5)
//连接
hac.startActiveConnect()
}
func (hac *HsmsActiveConnection) stopCheckTimer() {
if hac.hTimer != nil {
// 停止计时器, 停止计数器并不会关闭 通道, 以防止从通道读取成功
hac.hTimer.Stop()
}
}
3、equipment mode
package connection
import (
"secs-gem/hsms/queue"
"context"
"fmt"
"net"
)
/*-----------------------------------
server (equipment mode)
-----------------------------------*/
type HsmsPassiveConnection struct {
*HsmsConnection
tcpServer *net.TCPListener
}
func (hpc *HsmsPassiveConnection) Enable() error {
if hpc.view.Enabled.Load() { // 已启动就不能启动了
return nil
}
hpc.mx.Lock()
defer hpc.mx.Unlock()
hpc.ctx, hpc.ctxCancel = context.WithCancel(context.Background())
hpc.sendQueue = queue.NewPriorityQueue()
hpc.view.SQueue = hpc.sendQueue.SQueueView
if err := hpc.startPassiveServer(); err != nil {
return err
}
hpc.view.Enabled.Store(true)
return nil
}
func (hpc *HsmsPassiveConnection) Disable() error {
if !hpc.view.Enabled.Load() {
return nil
}
hpc.mx.Lock()
defer hpc.mx.Unlock()
hpc.HsmsConnection.disable()
hpc.tcpServer.Close()
hpc.view.Enabled.Store(false)
return nil
}
func (hpc *HsmsPassiveConnection) GetView() interface{} {
hpc.mx.RLock()
defer hpc.mx.RUnlock()
return hpc.view
}
// startPassiveServer 此时配置文件中的active=false
func (hpc *HsmsPassiveConnection) startPassiveServer() error {
address := fmt.Sprintf("%s:%d", hpc.config.Host, hpc.config.GetPort())
tcpAddr, err := net.ResolveTCPAddr("tcp4", address) // 获取一个TCP的Addr
if err != nil {
return fmt.Errorf("start passive hsms error1: %s", err)
}
var err1 error
// 监听服务器地址
if hpc.tcpServer, err1 = net.ListenTCP("tcp", tcpAddr); err1 != nil {
return fmt.Errorf("start passive hsms error2: %s", err1)
}
hpc.log.Info("passive server started")
go func() {
defer func() {
hpc.view.IsListening = false
}()
hpc.view.IsListening = true
for hpc.ctx.Err() == nil {
conn, err := hpc.tcpServer.AcceptTCP() // 阻塞等待客户端建立连接请求
if err != nil {
hpc.log.Warnf("accept error %s", err)
continue
}
if hpc.sock != nil { // 意味着此时只能有一个连接能够连接成功!!
if hpc.connected {
hpc.log.Warnf("exists conn %s", hpc.sock)
conn.Close()
continue
}
}
hpc.log.Infof("connected by %s", conn.RemoteAddr())
hpc.sock = conn
go hpc.startReceiverAndSender()
}
}()
return nil
}
看过上述代码后,其实最重要的就是相同部分
中如何接受数据以及如何处理数据,把这个搞懂了,其他的就比较好理解。
通过上面的 _processReceiveBuffer
函数,我们也可以看到了 tcp 解决粘包
的经典解决方式,即传输的数据格式采用TLV
的方式组合数据。
对此不太熟悉的同学,可以看看刘丹冰大佬写的 zinx 框架,此链接介绍了 TLV
封包来解决 TCP 粘包的问题, 5.2 消息的封包与拆包。