首页 > 其他分享 >08、从0到1实现SECS协议之HSMS协议中的connection

08、从0到1实现SECS协议之HSMS协议中的connection

时间:2023-08-03 18:57:41浏览次数:40  
标签:return nil 08 hac SECS hpc connection func sc

前面实现优先级队列,现在就准备开始实现 HSMS 协议中如何处理 connection。在之前定义的接口中,我们定义了 IConnection 接口,现在我们先来实现这个接口。我们知道, 此软件 既可以作为 Host 端,主动去连接 equipment,也是可以作为 equipment 端,让 其他系统来连接我们的系统。因此,connection 是分为 两种类型的。

但是他们在实现 IConnection 接口 时,有部分内容是相同的,因此我们先实现相同的部分,不相同的部分我们后面在实现。

1、相同的部分

package connection

import (
	"secs-gem/hsms/packet"
	"secs-gem/hsms/queue"
	driver_log "secs-gem/log"
	"secs-gem/secsgem"

	"bytes"
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"net"
	"runtime/debug"
	"strings"
	"sync"
	"time"
)

const (
	SendBlockSize int = 1024 * 1024 // 每次发送数据的最大字节数
)

/*-----------------------------------

	连接

-----------------------------------*/

func NewConnection(cfg *secsgem.ConnectionCfg, delegate secsgem.IDelegate) secsgem.IConnection {
	var sc secsgem.IConnection
	if cfg.Active {
		sc = &HsmsActiveConnection{
			HsmsConnection: newHsmsConnection(cfg, delegate),
		}
	} else {
		sc = &HsmsPassiveConnection{
			HsmsConnection: newHsmsConnection(cfg, delegate),
		}
	}
	return sc
}

func newHsmsConnection(cfg *secsgem.ConnectionCfg, delegate secsgem.IDelegate) *HsmsConnection {
	sc := &HsmsConnection{
		view:      &secsgem.ConnView{},
		delegate:  delegate,
		recvChan:  make(chan *packets.HsmsPacket, 1024),
		connected: false,
		log:       zap.S(),
	}
	//设置cfg
	sc.SetConfig(cfg)
	return sc
}

type HsmsConnection struct {
	ctx       context.Context
	ctxCancel context.CancelFunc
	config    *secsgem.ConnectionCfg
	view      *secsgem.ConnView

	// 建立连接/断开连接的处理过程
	delegate secsgem.IDelegate
	sock     *net.TCPConn
	status   secsgem.Status

	// 缓存
	receiveBuffer bytes.Buffer

	// 接收chan
	recvChan chan *packets.HsmsPacket
	// 发送queue
	sendQueue *queue.PriorityQueue
	// 是否已连接
	connected bool

	//日志
	log   *zap.SugaredLogger
	mx    sync.RWMutex
	group errgroup.Group
}

func (sc *HsmsConnection) GetConfig() *secsgem.ConnectionCfg {
	return sc.config
}

func (sc *HsmsConnection) SetConfig(cfg *secsgem.ConnectionCfg) {
	if sc.view.Enabled.Load() {
		//已启用不能修改
		return
	}
	sc.mx.Lock()
	defer sc.mx.Unlock()

	cfg.ReviseDuration() // 修正精度
	sc.config = cfg
	// 往日志中添加 port、active 字段
	// 后续使用 sc.log 输出的日志都将拥有 With 后面的内容
	sc.log = driver_log.S().With("port", cfg.GetPort(), "active", cfg.Active)
}

func (sc *HsmsConnection) disable() error {
	if sc.ctx != nil && sc.ctx.Err() == nil {
		sc.ctxCancel()
	}
	//关闭连接
	if sc.sock != nil {
		sc.sock.Close()
	}

	// 等待 errgroup 启动的 goroutine 的协程退出
	sc.group.Wait()
	return nil
}

func (sc *HsmsConnection) GetCtx() context.Context {
	return sc.ctx
}

// SendPacket 发送 packet
func (sc *HsmsConnection) SendPacket(packet interface{}, priority int) error {
	pkt, ok := packet.(*packets.HsmsPacket)
	if !ok {
		return fmt.Errorf("packet type error %s", pkt)
	}
	if !sc.connected {
		return fmt.Errorf("disconnected")
	}
	priority |= secsgem.PriorityNORMAL

	//放入queue
	systemId := pkt.Header.System
	sc.log.Debugf("[system=0x%x] into send_queue", systemId)

	sc.sendQueue.PutNoWait(&queue.QueueItem{
		Value:    pkt,
		Priority: priority,
		JoinTime: time.Now(),
	})
	return nil
}

// 发送
func (sc *HsmsConnection) _sendPacket(sock *net.TCPConn, pkt *packets.HsmsPacket) error {

	systemId := pkt.Header.System
	data := pkt.Encode()

	length := len(data)

	sc.log.Debugf("[system=0x%x] send packet %s", systemId, pkt)

	for i := 0; i < length; i += SendBlockSize {
		start := i
		end := i + SendBlockSize
		if end > length {
			end = length
		}
		block := data[start:end]
		retryCount := 5
		retry := true
		for retry && retryCount > 0 {
			retryCount -= 1

			sc.log.Debugf("[system=0x%x][send] %v", systemId, block)

			//secsgem.MetricHsmsSendTotal.Add(float64(len(block)))

			_, err := sock.Write(block)
			if err != nil {
				errMsg := err.Error()
				sc.log.Warn("socket conn send data failed, err: ", errMsg)
				if strings.Contains(errMsg, "use of closed network connection") {
					return err
				}
				continue
			}
			retry = false
		}
	}
	return nil
}

// startReceiverAndSender 开启接受者和发送者
func (sc *HsmsConnection) startReceiverAndSender() {
	sc.connected = true
	//工作context
	_ctx, _ctxCancel := context.WithCancel(sc.ctx)

	//接收线程
	sc.group.Go(func() error {
		defer func() {
			if e := recover(); e != nil {
				sc.log.Errorf("[panic]%v\n%s", e, debug.Stack())
			}
		}()
		sc.receiverLoop(_ctx, _ctxCancel)
		return nil
	})

	//发送线程
	sc.group.Go(func() error {
		defer func() {
			if e := recover(); e != nil {
				sc.log.Errorf("[panic]%v\n%s", e, debug.Stack())
			}
		}()
		sc.senderLoop(_ctx)
		return nil
	})

	//接收消息处理
	sc.group.Go(func() error {
		sc.handleRevMessageLoop(_ctx)
		return nil
	})

	//连接
	if sc.delegate != nil {
		sc.delegate.OnConnectionEstablished()
	}
}

func (sc *HsmsConnection) receiverLoop(ctx context.Context, cancel context.CancelFunc) {
	defer func() {
		sc.view.IsReceiving = false
	}()

	sc.view.IsReceiving = true
	if err := sc._receiverReadData(ctx); err != nil {
		sc.log.Warnf("receiver error: %s", err)
	}

	//关闭连接
	sc.sock.Close()
	//ctx.
	cancel()

	//通知关闭连接
	if sc.delegate != nil {
		sc.delegate.OnConnectionClosed()
	}
}

//发送线程
func (sc *HsmsConnection) senderLoop(ctx context.Context) {
	defer func() {
		sc.view.IsSending = false
	}()
	sc.view.IsSending = true

	sock := sc.sock

	for ctx.Err() != context.Canceled {
		item := sc.sendQueue.Get(ctx, time.Second*5)
		if item != nil {
			pkt := item.Value
			systemId := pkt.Header.System
			// 检测超时
			waitTime := time.Now().Sub(item.JoinTime)
			if waitTime > sc.config.T3 {
				sc.log.Debugf("[system=0x%x] send_queue overtime wait_time=%s ", systemId, waitTime)
				continue
			}
			sc.log.Infof("[system=0x%x] send message: %s", systemId, pkt)
			err := sc._sendPacket(sock, pkt)
			if err != nil {
				sc.log.Warnf("[system=0x%x] %s", systemId, err)
			}
		}
	}
	sc.log.Debug("sender stopped")
}

func (sc *HsmsConnection) _receiverReadData(ctx context.Context) error {
	var buff = make([]byte, 10240)
	for ctx.Err() == nil {
		n, err := sc.sock.Read(buff)
		if err != nil {
			sc.connected = false
			return err
		}
		//缓存
		data := buff[:n]
		sc.log.Debugf("[recv] %v", data)

		//secsgem.MetricHsmsReadTotal.Add(float64(len(data)))

		sc.receiveBuffer.Write(data)

		//处理
		for sc._processReceiveBuffer() && ctx.Err() == nil {
		}
	}
	return errors.New("receiver stopped")
}

//处理数据包
func (sc *HsmsConnection) _processReceiveBuffer() bool {
	// hsms 协议规定的前四个字节为: Message Length, 因此收到的字节数不能少于 4 个
	// 这里有一个问题, 假设 tcp 的包被拆分后, 后面几个字节恰好在下一个包中, 那下一个包就会被丢弃??
	// 其实不会的, 因为这里返回 false , 在调用此函数的地方, 是一个 for 循环, 而接收到的数据是存放在 bytes.Buffer 中的
	if sc.receiveBuffer.Len() < 4 {
		return false
	}

	//长度, 这里加上 4 的原因是加上了 Message Length 后, 方便后面的计算而已
	length := binary.BigEndian.Uint32(sc.receiveBuffer.Bytes()[:4]) + 4

	if sc.receiveBuffer.Len() < int(length) { // 未接受完数据, 继续接受
		return false
	}
	data := make([]byte, int(length))
	// 从 bytes.Buffer 中读取 length 的数据, 这里一定确保了有 length 的长度的数据
	sc.receiveBuffer.Read(data)

	//解析, 所以 decode 函数里面, 会先把前 4 个字节去掉
	response := packets.Decode(data)
	sc.log.Infof("[system=0x%x] recv message: %s", response.Header.System, response)

	//放入chan
	sc.recvChan <- response

	//返回, 如果还有剩余的数据, 继续接受处理
	hasData := sc.receiveBuffer.Len() > 0
	return hasData
}

//接收消息处理
func (sc *HsmsConnection) handleRevMessageLoop(ctx context.Context) {
	defer func() {
		sc.view.IsHanding = false
	}()

	sc.view.IsHanding = true
	for ctx.Err() == nil {
		select {
		case message := <-sc.recvChan: // _processReceiveBuffer 函数中接受数据, 将接收到的数据写入到 sc.recvChan 中
			if sc.delegate == nil {
				continue
			}

			go func() {
				defer func() {
					if e := recover(); e != nil {
						sc.log.Errorf("[panic] recv handler %v\n%s", e, debug.Stack())
					}
				}()
				sc.delegate.OnConnectionPacketReceived(nil, message)
			}()
		case <-ctx.Done():
			sc.log.Debug("recv message handle stopped")
			return
		}
	}
}

/*-----------------------------------

	multi-server

-----------------------------------*/

type HsmsMultiPassiveServer struct {
}

/*-------------------------------------
		状态
-------------------------------------*/

func (sc *HsmsConnection) Status() secsgem.Status {
	sc.mx.RLock()
	defer sc.mx.RUnlock()
	return sc.status
}

func (sc *HsmsConnection) isClosed() bool {
	return sc.status == secsgem.CLOSED
}

func (sc *HsmsConnection) isConnected() bool {
	return sc.status == secsgem.CONNECTED
}

func (sc *HsmsConnection) isDisconnected() bool {
	return sc.status == secsgem.DISCONNECTED
}

func (sc *HsmsConnection) isConnecting() bool {
	return sc.status == secsgem.CONNECTING
}

func (sc *HsmsConnection) isReconnecting() bool {
	return sc.status == secsgem.RECONNECTING
}

2、host mode

此时配置中的 active=true

package connection

import (
	"secs-gem/hsms/queue"

	"context"
	"fmt"
	"net"
	"time"
)

/*-----------------------------------

	active (host mode)

-----------------------------------*/

type HsmsActiveConnection struct {
	*HsmsConnection
	hTimer *time.Timer
}

func (hac *HsmsActiveConnection) Enable() error {
	if hac.view.Enabled.Load() { // 已启动就不能启动了
		return nil
	}
	hac.mx.Lock()
	defer hac.mx.Unlock()

	hac.ctx, hac.ctxCancel = context.WithCancel(context.Background())
	hac.sendQueue = queue.NewPriorityQueue()
	hac.view.SQueue = hac.sendQueue.SQueueView
	//启动连接
	if err := hac.startActiveConnect(); err != nil {
		return err
	}

	// 首次连接成功后, 才会开启 循环重连
	if hac.config.AutoReconnect {
		if hac.hTimer == nil {
			hac.hTimer = time.AfterFunc(hac.config.T5, hac.activeConnCheckTimer)
			hac.log.Info("start timing detector")
		} else {
			hac.hTimer.Reset(hac.config.T5)
		}
	}
	hac.view.Enabled.Store(true)
	return nil
}

func (hac *HsmsActiveConnection) Disable() error {
	if !hac.view.Enabled.Load() {
		return nil
	}

	hac.mx.Lock()
	defer hac.mx.Unlock()

	hac.stopCheckTimer() // 停止 hTimer
	hac.disable()
	hac.view.Enabled.Store(false)
	return nil
}

func (hac *HsmsActiveConnection) GetView() interface{} {
	hac.mx.RLock()
	defer hac.mx.RUnlock()
	return hac.view
}

// startActiveConnect 此时配置中的active=true
// 主动连接配置中的 ip:port
func (hac *HsmsActiveConnection) startActiveConnect() error {

	if !hac.connected {
		//打开连接
		address := fmt.Sprintf("%s:%d", hac.config.Host, hac.config.GetPort())
		tcpAddr, err := net.ResolveTCPAddr("tcp4", address)
		if err != nil {
			return fmt.Errorf("start active hsms error: %s", err)
		}

		if conn, err1 := net.DialTCP("tcp", nil, tcpAddr); err1 != nil {
			hac.log.Warn(err1)
			return err1
		} else {
			hac.log.Infof("connected to %s", address)
			hac.sock = conn
			//消息处理
			hac.startReceiverAndSender()
		}
	}
	return nil
}

func (hac *HsmsActiveConnection) activeConnCheckTimer() {
	defer hac.hTimer.Reset(hac.config.T5)

	//连接
	hac.startActiveConnect()
}

func (hac *HsmsActiveConnection) stopCheckTimer() {
	if hac.hTimer != nil {
		// 停止计时器, 停止计数器并不会关闭 通道, 以防止从通道读取成功
		hac.hTimer.Stop()
	}
}

3、equipment mode

package connection

import (
	"secs-gem/hsms/queue"

	"context"
	"fmt"
	"net"
)

/*-----------------------------------

	server (equipment mode)

-----------------------------------*/

type HsmsPassiveConnection struct {
	*HsmsConnection
	tcpServer *net.TCPListener
}

func (hpc *HsmsPassiveConnection) Enable() error {
	if hpc.view.Enabled.Load() { // 已启动就不能启动了
		return nil
	}

	hpc.mx.Lock()
	defer hpc.mx.Unlock()

	hpc.ctx, hpc.ctxCancel = context.WithCancel(context.Background())
	hpc.sendQueue = queue.NewPriorityQueue()
	hpc.view.SQueue = hpc.sendQueue.SQueueView

	if err := hpc.startPassiveServer(); err != nil {
		return err
	}
	hpc.view.Enabled.Store(true)
	return nil
}

func (hpc *HsmsPassiveConnection) Disable() error {
	if !hpc.view.Enabled.Load() {
		return nil
	}

	hpc.mx.Lock()
	defer hpc.mx.Unlock()

	hpc.HsmsConnection.disable()
	hpc.tcpServer.Close()
	hpc.view.Enabled.Store(false)
	return nil
}

func (hpc *HsmsPassiveConnection) GetView() interface{} {
	hpc.mx.RLock()
	defer hpc.mx.RUnlock()

	return hpc.view
}

// startPassiveServer 此时配置文件中的active=false
func (hpc *HsmsPassiveConnection) startPassiveServer() error {
	address := fmt.Sprintf("%s:%d", hpc.config.Host, hpc.config.GetPort())
	tcpAddr, err := net.ResolveTCPAddr("tcp4", address) // 获取一个TCP的Addr
	if err != nil {
		return fmt.Errorf("start passive hsms error1: %s", err)
	}

	var err1 error
	// 监听服务器地址
	if hpc.tcpServer, err1 = net.ListenTCP("tcp", tcpAddr); err1 != nil {
		return fmt.Errorf("start passive hsms error2: %s", err1)
	}

	hpc.log.Info("passive server started")
	go func() {
		defer func() {
			hpc.view.IsListening = false
		}()
		hpc.view.IsListening = true

		for hpc.ctx.Err() == nil {
			conn, err := hpc.tcpServer.AcceptTCP() // 阻塞等待客户端建立连接请求
			if err != nil {
				hpc.log.Warnf("accept error %s", err)
				continue
			}
			if hpc.sock != nil { // 意味着此时只能有一个连接能够连接成功!!
				if hpc.connected {
					hpc.log.Warnf("exists conn %s", hpc.sock)
					conn.Close()
					continue
				}
			}
			hpc.log.Infof("connected by %s", conn.RemoteAddr())
			hpc.sock = conn

			go hpc.startReceiverAndSender()
		}
	}()
	return nil
}

看过上述代码后,其实最重要的就是相同部分中如何接受数据以及如何处理数据,把这个搞懂了,其他的就比较好理解。

通过上面的 _processReceiveBuffer 函数,我们也可以看到了 tcp 解决粘包的经典解决方式,即传输的数据格式采用TLV的方式组合数据。

对此不太熟悉的同学,可以看看刘丹冰大佬写的 zinx 框架,此链接介绍了 TLV封包来解决 TCP 粘包的问题, 5.2 消息的封包与拆包

标签:return,nil,08,hac,SECS,hpc,connection,func,sc
From: https://www.cnblogs.com/huageyiyangdewo/p/17604177.html

相关文章