11、从0到1实现SECS协议之HSMS协议中的handler
前面实现了 发送事件的处理机制,接下来我们即将实现ISecsHandler
接口,将前面实现的各种功能组合起来,从而提供一个完整可用的服务。
1、handler 的具体实现
package handler
import (
"context"
"errors"
"fmt"
"github.com/looplab/fsm"
"go.uber.org/zap"
"math/rand"
"runtime/debug"
"secs-gem/functions"
"sync"
"sync/atomic"
"time"
"secs-gem/common"
secs_errors "secs-gem/errors"
"secs-gem/hsms/connection"
packets "secs-gem/hsms/packet"
"secs-gem/hsms/statemachine"
driver_log "secs-gem/log"
"secs-gem/secsgem"
)
// HsmsHandler hsms 协议的处理者
type HsmsHandler struct {
// 建立连接时应当做的事情
secsgem.IConnection
// 消息监听者(接收到 pkt 时如何处理这些 pkt )
listener secsgem.MessageListener
// device id
sessionId uint16
// hsms 协议中的 连接状态机
connectionState *statemachine.ConnectionState
connected bool // 是否连接
// hsms 协议中的唯一标识
// 唯一地标识此消息事务, ReplyMessage 的 SystemBytes 应与其对应回复的Prymary Message的 System Bytes 相同
systemCounter uint32
systemQueues sync.Map // 存储一个 systemCounter 使用到的 优先级队列
// hsms 协议规定, 客户端需要定时发送 LinkTest.req , 这样才认为通信正常?
linkTestTimer *time.Timer
linkTestTimeout time.Duration
timeMx sync.Mutex
// 日志
log *zap.SugaredLogger
mx sync.RWMutex
// 桥接模式下的消息转发
bridge secsgem.ISecsBridge
}
func NewHsmsHandler(cfg *secsgem.ConnectionCfg, listener secsgem.MessageListener) *HsmsHandler {
// 构造函数
hh := &HsmsHandler{
sessionId: uint16(cfg.SessionId),
listener: listener,
linkTestTimeout: 30 * time.Second,
}
// 日志
hh.log = driver_log.S().With("port", cfg.Port, "active", cfg.Active)
// 连接
hh.IConnection = connection.NewConnection(cfg, hh)
// system bytes
rand.Seed(time.Now().Unix())
hh.systemCounter = rand.Uint32()
//连接状态
callbacks := fsm.Callbacks{
"on_enter_CONNECTED": hh.onStateConnect,
"on_exit_CONNECTED": hh.onStateDisconnect,
"on_enter_CONNECTED_SELECTED": hh.onStateSelect,
}
hh.connectionState = statemachine.NewConnectionState("hsms", callbacks)
return hh
}
func (hh *HsmsHandler) IsActive() bool {
return hh.GetConfig().Active
}
func (hh *HsmsHandler) SetSessionID(sessionId uint16) {
/*
设置sessionID
使用者: 1.自动获取DeviceID, 当接收到 S9F1 指令时, 说明 device id 设置错误, 这时自动修正 device id 为接收到的数据
*/
hh.sessionId = sessionId
}
func (hh *HsmsHandler) Enable() error {
return hh.IConnection.Enable()
}
func (hh *HsmsHandler) Disable() error {
return hh.IConnection.Disable()
}
// 连接状态
func (hh *HsmsHandler) onStateConnect(ctx context.Context, e *fsm.Event) {
hh.log.Debug("on state connect")
if hh.IsActive() { // 客户端才主动发送心跳 (host)
//心跳检测(应该在init时启动timer)
hh.startLinkTester()
go hh.sendSelectReq()
}
}
func (hh *HsmsHandler) onStateDisconnect(ctx context.Context, e *fsm.Event) {
//重置
hh.timeMx.Lock()
if hh.linkTestTimer != nil {
hh.linkTestTimer.Stop()
}
hh.timeMx.Unlock()
}
func (hh *HsmsHandler) onStateSelect(ctx context.Context, e *fsm.Event) {
}
func (hh *HsmsHandler) onLinkTestTimer() {
//发送心跳
if hh.connected {
hh.sendLinkTestReq()
}
//重置
hh.timeMx.Lock()
timer := hh.linkTestTimer
if timer != nil {
timer.Reset(hh.linkTestTimeout)
} else {
//timer过期了
}
hh.timeMx.Unlock()
}
// startLinkTester 开启定时发送心跳包 LinkTest.req
func (hh *HsmsHandler) startLinkTester() {
//连接心跳
hh.timeMx.Lock()
timer := hh.linkTestTimer
if timer == nil {
hh.linkTestTimer = time.AfterFunc(hh.linkTestTimeout, hh.onLinkTestTimer)
} else {
timer.Reset(hh.linkTestTimeout)
}
hh.timeMx.Unlock()
}
// sendSelectReq select.req
// 用于使用Select.req和Select.rsp消息在TCP/IP连接上建立HSMS通信
// 实体由 Not Selected 状态转换为 Selected 状态所使用的消息 (Active Entity 发送Select.req);
func (hh *HsmsHandler) sendSelectReq() *packets.HsmsPacket {
hh.log.Debug("select.req")
//packet
systemId := hh.GetNextSystemCounter()
queueChan := hh.getQueueForSystem(systemId)
outPacket := &packets.HsmsPacket{
Header: packets.HsmsSelectReqHeader(systemId),
}
//发送
hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
//等待结果
response, err := hh.waitResponse(&queueChan, hh.GetConfig().T6)
if err != nil {
hh.log.Debugf("[system=0x%x] %s", systemId, err)
}
//移除queue
hh.removeQueue(systemId)
return response
}
// sendSelectRsp select.rsp
func (hh *HsmsHandler) sendSelectRsp(systemId uint32) {
//packet
outPacket := &packets.HsmsPacket{
Header: packets.HsmsSelectRspHeader(systemId),
}
hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}
// sendLinkTestReq linktest.req
func (hh *HsmsHandler) sendLinkTestReq() *packets.HsmsPacket {
hh.log.Debugf("linktest.req (connected=%v)", hh.connected)
//packet
systemId := hh.GetNextSystemCounter()
queueChan := hh.getQueueForSystem(systemId)
outPacket := &packets.HsmsPacket{
Header: packets.HsmsLinkTestReqHeader(systemId),
}
//日志
secsgem.PublishEvent(&secsgem.SecsMessageSend{
PortID: hh.GetConfig().ID,
System: systemId,
Stream: outPacket.Header.Stream,
Function: outPacket.Header.Function,
Packet: outPacket,
})
//发送
hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
//等待结果
response, err := hh.waitResponse(&queueChan, hh.GetConfig().T6)
if err != nil {
hh.log.Warnf("[system=0x%x] %s", systemId, err)
}
//移除queue
hh.removeQueue(systemId)
return response
}
// sendLinktestRsp linktest.rsp
func (hh *HsmsHandler) sendLinktestRsp(systemId uint32) {
//packet
outPacket := &packets.HsmsPacket{
Header: packets.HsmsLinkTestRspHeader(systemId),
}
//日志
secsgem.PublishEvent(&secsgem.SecsMessageSend{
PortID: hh.GetConfig().ID,
System: systemId,
Stream: outPacket.Header.Stream,
Function: outPacket.Header.Function,
Packet: outPacket,
})
hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}
//deselect.req
func (hh *HsmsHandler) sendDeselectReq() *packets.HsmsPacket {
hh.log.Debug("deselect.req")
systemId := hh.GetNextSystemCounter()
queueChan := hh.getQueueForSystem(systemId)
outPacket := &packets.HsmsPacket{
Header: packets.HsmsDeselectReqHeader(systemId),
}
//发送
hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
//等待结果
response, err := hh.waitResponse(&queueChan, hh.GetConfig().T6)
if err != nil {
hh.log.Debug(err)
}
//移除queue
hh.removeQueue(systemId)
return response
}
//deselect.rsp
func (hh *HsmsHandler) sendDeselectRsp(systemId uint32) {
outPacket := &packets.HsmsPacket{
Header: packets.HsmsDeselectRspHeader(systemId),
}
//发送
hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}
//separate.req
func (hh *HsmsHandler) sendSeparateReq() {
systemId := hh.GetNextSystemCounter()
outPacket := &packets.HsmsPacket{
Header: packets.HsmsSeparateReqHeader(systemId),
}
//发送
hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
}
// GetNextSystemCounter 获取一个未使用的 system id
func (hh *HsmsHandler) GetNextSystemCounter() uint32 {
atomic.AddUint32(&hh.systemCounter, 1)
return hh.systemCounter
}
// waitResponse 等待结果, 等待从 queue chan 中获取收到的 pkt
// queueChan: 对于每一个 system id 都有一个专属的 queue chan
// timeout: 最大等待时长
// 现在就需要知道哪个地方在往 queueChan 中发送 pkt
// 实际上往 queueChan 中发送 pkt 的地方是 OnConnectionPacketReceived 函数,
// 函数中会先根据 system id 尝试从 systemQueues(sync.Map) 获取 chan ,能得到说明是自己主动发送的消息的回复
func (hh *HsmsHandler) waitResponse(queueChan *chan packets.HsmsPacket, timeout time.Duration) (*packets.HsmsPacket, error) {
timer := common.GlobalTimerPool.Get(timeout)
defer common.GlobalTimerPool.Put(timer)
select {
case response, ok := <-*queueChan:
if !ok {
return nil, errors.New("wait channel closed")
}
return &response, nil
case <-hh.IConnection.GetCtx().Done():
return nil, fmt.Errorf("wait response context canceled")
case <-timer.C:
return nil, fmt.Errorf("wait response timeout(T3=%s) ", timeout)
}
}
/*
connection 事件接口
*/
func (hh *HsmsHandler) OnConnectionEstablished() {
hh.connected = true
hh.log.Info("Connection established")
hh.connectionState.FSM.Event(context.Background(), "connect")
secsgem.PublishEvent(&secsgem.ConnectionEstablished{PortID: hh.GetConfig().ID})
}
func (hh *HsmsHandler) OnConnectionPacketReceived(conn secsgem.IConnection, packet interface{}) {
pkt, ok := packet.(*packets.HsmsPacket)
if !ok {
return
}
// 往 rxgo 中发送事件
secsgem.PublishEvent(&secsgem.SecsMessageRecv{
PortID: hh.GetConfig().ID,
System: pkt.Header.System,
Stream: pkt.Header.Stream,
Function: pkt.Header.Function,
Packet: pkt,
})
systemId := pkt.Header.System
if pkt.Header.SType > 0 {
// 此时是非 data message
hh._handleHsmsRequests(pkt)
} else { // data message
if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
queueChan := _queueChan.(chan packets.HsmsPacket)
// 这里就是 waitResponse 函数中等待数据发送的地方
queueChan <- *pkt
} else {
//未进行标记的消息
hh._OnSecsPacketReceived(pkt)
}
}
}
func (hh *HsmsHandler) _handleHsmsRequests(pkt *packets.HsmsPacket) {
systemId := pkt.Header.System
hh.log.Debugf("[system=0x%x] < %s", systemId, pkt)
switch pkt.Header.SType {
case 0x01:
// 说明此 pkt 是 select.req, 需要进行回复
hh.sendSelectRsp(pkt.Header.System)
hh.connectionState.FSM.Event(context.Background(), "select")
case 0x02:
// 说明此 pkt 是 select.rsp, 即前面发送 select.req 消息的回复
hh.connectionState.FSM.Event(context.Background(), "select")
if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
queueChan := _queueChan.(chan packets.HsmsPacket)
queueChan <- *pkt
}
case 0x03:
// 说明此 pkt 是 deselect.req
// 用于在断开TCP/IP连接之前,为一个实体提供一个优雅的HSMS通信终止
// HSMS要求使用该Procedure时连接处于Selected状态。
// 这里并不用严格按照 协议来实现
hh.sendDeselectRsp(pkt.Header.System)
hh.connectionState.FSM.Event(context.Background(), "deselect")
case 0x04:
hh.connectionState.FSM.Event(context.Background(), "deselect")
if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
queueChan := _queueChan.(chan packets.HsmsPacket)
queueChan <- *pkt
}
case 0x05:
hh.sendLinktestRsp(pkt.Header.System)
default:
if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
queueChan := _queueChan.(chan packets.HsmsPacket)
queueChan <- *pkt
}
}
}
func (hh *HsmsHandler) OnConnectionBeforeClosed() {
hh.log.Info("Connection to be close")
hh.sendSeparateReq()
}
func (hh *HsmsHandler) OnConnectionClosed() {
hh.log.Info("Connection closed")
hh.connectionState.FSM.Event(context.Background(), "disconnect")
hh.connected = false
}
func (hh *HsmsHandler) _OnSecsPacketReceived(packet *packets.HsmsPacket) (err error) {
defer func() {
if err1 := recover(); err1 != nil {
err = secs_errors.CovertError(err1)
hh.log.Warn(string(debug.Stack()))
}
}()
//自动修改device id
if packet.Header.Stream == 9 && packet.Header.Function == 1 {
hh.sessionId = packet.Header.SessionID
}
if hh.listener != nil {
// 下面是决定收到的消息如何处理, 为什么这里并没有在此仓库中实现如何处理接收到的消息呢?
// 往高一层级思考, 此仓库的作用是如何编解码、建立连接、接受pkt 等公共的功能, 但是收到的 pkt 应当如何处理应该由使用此仓库的者去决定
// 这样此仓库的功能更加纯粹, 后面也有更多扩展性的可能
if packet.Header.Function%2 == 1 {
hh.listener.PrimaryIn(hh, packet)
} else {
hh.listener.SecondaryIn(hh, packet)
}
}
return nil
}
// getQueueForSystem 创建 存储元素为 HsmsPacket 的 channel, 并将其存放在并发安全的 sync.Map 中
// 这里使用不当时, 会造成 queue 中的 数据丢失
func (hh *HsmsHandler) getQueueForSystem(systemId uint32) chan packets.HsmsPacket {
//发送队列
queue := make(chan packets.HsmsPacket, 10)
hh.systemQueues.Store(systemId, queue)
return queue
}
// removeQueue 删除 sync.Map 中的 queue
func (hh *HsmsHandler) removeQueue(systemId uint32) {
if _, ok := hh.systemQueues.Load(systemId); ok {
hh.systemQueues.Delete(systemId)
}
}
// SendResponse 接收到消息时发送回复的响应
// 后期调用此方式的地方主要有两个地方, 一个是 flow 流程中的 PrimaryIn, 另
// 外一个是 StandardEquipment 中的 PrimaryIn
func (hh *HsmsHandler) SendResponse(packet interface{}, system uint32) error {
// 消息响应
var err error
var pkt *secsgem.Packet
if sf, ok := packet.(*functions.SecsStreamFunction); ok {
if pkt, err = sf.CastPacket(); err != nil {
return err
}
} else {
if pkt, err = secsgem.CastPacket(packet); err != nil {
return err
}
}
// 发送
header := &packets.HsmsHeader{
SessionID: hh.sessionId,
RequireResponse: false,
Stream: pkt.Stream,
Function: pkt.Function,
System: system,
}
outPacket := &packets.HsmsPacket{
Header: header,
Data: pkt.Data,
}
//往 rxgo 中发送数据
secsgem.PublishEvent(&secsgem.SecsMessageSend{
PortID: hh.GetConfig().ID,
System: header.System,
Stream: header.Stream,
Function: header.Function,
Packet: outPacket,
})
return hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}
// SendAndWaitForResponse 发送并等待回复的响应
func (hh *HsmsHandler) SendAndWaitForResponse(packet interface{}, priority int) (interface{}, error) {
var err error
var pkt *secsgem.Packet
if sf, ok := packet.(*functions.SecsStreamFunction); ok {
if pkt, err = sf.CastPacket(); err != nil {
return nil, err
}
} else {
if pkt, err = secsgem.CastPacket(packet); err != nil {
return nil, err
}
}
if _, ok := hh.systemQueues.Load(pkt.System); pkt.System != 0 && ok {
// 指定 system id,system id已存在
// 这个时候不能够再使用此 system id , 否则影响数据的正确性
pkt.System = 0
}
if pkt.System == 0 {
//自动id
pkt.System = hh.GetNextSystemCounter()
}
systemId := pkt.System
queueChan := hh.getQueueForSystem(pkt.System)
defer hh.removeQueue(pkt.System)
//发送
header := &packets.HsmsHeader{
SessionID: hh.sessionId,
RequireResponse: true,
Stream: pkt.Stream,
Function: pkt.Function,
System: pkt.System,
}
outPacket := &packets.HsmsPacket{
Header: header,
Data: pkt.Data,
}
//日志
secsgem.PublishEvent(&secsgem.SecsMessageSend{
PortID: hh.GetConfig().ID,
System: header.System,
Stream: header.Stream,
Function: header.Function,
Packet: outPacket,
})
if err1 := hh.IConnection.SendPacket(outPacket, priority|secsgem.PriorityNORMAL); err1 != nil {
hh.log.Debugf("[system=0x%x] %s", systemId, err1)
return nil, err1
}
hh.log.Debugf("[system=0x%x] start t3 timer", systemId)
response, err2 := hh.waitResponse(&queueChan, hh.GetConfig().T3)
if err2 != nil {
hh.log.Debugf("[system=0x%x] %s", systemId, err2)
return nil, fmt.Errorf("system=0x%x %s", systemId, err2)
}
hh.log.Debugf("[system=0x%x] end t3 timer", systemId)
return response, nil
}
// SendStreamFunction 发送消息, 只发送不等待
func (hh *HsmsHandler) SendStreamFunction(packet interface{}) error {
//只发送不等待
var err error
var pkt *secsgem.Packet
if sf, ok := packet.(*functions.SecsStreamFunction); ok {
if pkt, err = sf.CastPacket(); err != nil {
return err
}
} else {
if pkt, err = secsgem.CastPacket(packet); err != nil {
return err
}
}
if pkt.System == 0 {
pkt.System = hh.GetNextSystemCounter()
}
//发送
header := &packets.HsmsHeader{
SessionID: hh.sessionId,
RequireResponse: pkt.RequireResponse,
Stream: pkt.Stream,
Function: pkt.Function,
System: pkt.System,
}
outPacket := &packets.HsmsPacket{
Header: header,
Data: pkt.Data,
}
//日志
secsgem.PublishEvent(&secsgem.SecsMessageSend{
PortID: hh.GetConfig().ID,
System: header.System,
Stream: header.Stream,
Function: header.Function,
Packet: packet,
})
return hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
}
func (hh *HsmsHandler) Connected() bool {
return hh.connected
}
func (hh *HsmsHandler) SetBridge(bridge secsgem.ISecsBridge) {
hh.bridge = bridge
}
func (hh *HsmsHandler) SupportBridgeRecv(fromHandler secsgem.ISecsHandler, packet interface{}, originalResponseUnused interface{}) (interface{}, error) {
if hh.bridge != nil {
return hh.bridge.SupportBridgeRecv(fromHandler, packet, originalResponseUnused)
}
return originalResponseUnused, nil
}
func (hh *HsmsHandler) GetView() interface{} {
return hh.IConnection.GetView()
}
func (hh *HsmsHandler) SetConfig(cfg *secsgem.ConnectionCfg) {
view := hh.IConnection.GetView().(*secsgem.ConnView)
if !view.Enabled.Load() { // 未连接的才可以修改 cfg
//连接
hh.IConnection = connection.NewConnection(cfg, hh)
hh.IConnection.SetConfig(cfg)
}
}
2、handler 的单元测试
package handler
import (
"fmt"
"runtime"
"secs-gem/secsgem"
"testing"
)
func TestHsmsHandlerForActive(t *testing.T) {
fmt.Println("-------------------- active 启动两次 --------------------")
activeConf := secsgem.ConnectionCfg{
Active: true,
DriverType: "hsms",
Host: "localhost",
Port: 5000,
}
handler := NewHsmsHandler(&activeConf, nil)
err := handler.Enable()
if err != nil {
t.Fatalf("enable active failed, err:%s", err)
}
goroutine1 := runtime.NumGoroutine()
err2 := handler.Enable()
if err2 != nil {
t.Fatalf("enable active failed, err:%s", err)
}
goroutine2 := runtime.NumGoroutine()
if goroutine1 != goroutine2 {
t.Fatalf("active enable goroutine not equal, goroutine1=%d,goroutine2=%d", goroutine1, goroutine2)
}
}
func TestHsmsHandlerForPassive(t *testing.T) {
fmt.Println("-------------------- passive 启动两次 --------------------")
passiveConf := secsgem.ConnectionCfg{
Active: false,
DriverType: "hsms",
Host: "localhost",
Port: 5001,
}
handler := NewHsmsHandler(&passiveConf, nil)
err := handler.Enable()
if err != nil {
t.Fatalf("enable passive failed, err:%s", err)
}
goroutine1 := runtime.NumGoroutine()
err2 := handler.Enable()
if err2 != nil {
t.Fatalf("enable passive failed, err:%s", err)
}
goroutine2 := runtime.NumGoroutine()
if goroutine1 != goroutine2 {
t.Fatalf("passive enable goroutine not equal, goroutine1=%d,goroutine2=%d", goroutine1, goroutine2)
}
}
标签:11,err,HsmsHandler,packets,hh,SECS,handler,systemId,func
From: https://www.cnblogs.com/huageyiyangdewo/p/17660252.html