首页 > 其他分享 >resurfaceio goreplay output-resurface 处理简单说明

resurfaceio goreplay output-resurface 处理简单说明

时间:2024-03-08 09:33:34浏览次数:30  
标签:SliceToString logPrefix byteutils resurface messageCounter Debug output message 

resurfaceio goreplay output-resurface 的处理实际上就是开发了一个新的output 插件,对于数据的写入是使用了
resurfaceio 的golang logger sdk

实现简单说明

output_resurface.go,核心就是一个标准的goreplay plugin,通过go channel 实现消息处理,包装为一个自己的http message
之后通过 logger sdk 写入到resurfaceio api 服务中

  • 消息处理
func (o *ResurfaceOutput) buildMessages() {
    // Keep track of both requests and responses for matching
    messages := make(map[string]*HTTPMessage, o.config.bufferSize)
    // Manually check and remove orphaned requests/responses every 10 s
    straysTicker := time.NewTicker(time.Second * 10)
    // (Debug) Count messages received/sent
    messageCounter := [3]int{0, 0, 0}
    for {
        select {
        case msg := <-o.messages:
            messageCounter[0]++
            metaSlice := payloadMeta(msg.Meta)
            // UUID shared by a given request and its corresponding response
            messageID := byteutils.SliceToString(metaSlice[1])
 
            message, messageFound := messages[messageID]
            if !messageFound {
                // Message timestamp
                messageTimestamp := byteutils.SliceToString(metaSlice[2])
 
                message = &HTTPMessage{}
                messages[messageID] = message
 
                messageTime, err := strconv.ParseInt(messageTimestamp, 10, 64)
                if err == nil {
                    message.initTime = time.Unix(0, messageTime)
                } else {
                    message.initTime = time.Now()
                    Debug(2, logPrefix, "Error parsing message timestamp", err.Error())
                }
            }
 
            reqFound := message.request != nil
            if !reqFound && isRequestPayload(msg.Meta) {
                message.request = msg
                reqFound = true
            }
 
            respFound := message.response != nil
            if !respFound && !isRequestPayload(msg.Meta) {
                message.response = msg
                respFound = true
            }
 
            if reqFound && respFound {
                o.httpMessages <- message
                messageCounter[1]++
                delete(messages, messageID)
            }
 
        case <-straysTicker.C:
            if n := len(messages); n > 0 {
                Debug(3, logPrefix, "Number of messages in queue:", n)
                for id, message := range messages {
                    Debug(4, logPrefix, "Checking message:", id)
                    hasRequest := message.request != nil
                    hasResponse := message.response != nil
                    if ((hasRequest && !hasResponse) || (!hasRequest && hasResponse)) &&
                        time.Since(message.initTime) >= time.Minute*2 {
 
                        Debug(3, logPrefix, "STRAY MESSAGE:", id)
                        if Settings.Verbose > 3 {
                            if hasRequest {
                                Debug(4, logPrefix, "REQUEST:", byteutils.SliceToString(message.request.Meta))
                                Debug(5, logPrefix, "REQUEST:\n", byteutils.SliceToString(message.request.Data))
                            }
                            if hasResponse {
                                Debug(4, logPrefix, "RESPONSE:", byteutils.SliceToString(message.response.Meta))
                                Debug(5, logPrefix, "RESPONSE:\n", byteutils.SliceToString(message.response.Data))
                            }
                        }
 
                        delete(messages, id)
                        Debug(3, logPrefix, "MESSAGE", id, "DELETED")
                        messageCounter[2]++
                    }
                }
            }
            if messageCounter[0]+messageCounter[1]+messageCounter[2] != 0 {
                Debug(1, logPrefix, "messages received:", messageCounter[0],
                    ", full messages sent:", messageCounter[1], ", orphans deleted:", messageCounter[2])
                messageCounter = [3]int{0, 0, 0}
            }
        }
    }
}
  • 数据写入处理
func (o *ResurfaceOutput) sendMessages() {
    for message := range o.httpMessages {
        req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(message.request.Data)))
        if reqErr != nil {
            continue
        }
 
        resp, respErr := http.ReadResponse(bufio.NewReader(bytes.NewReader(message.response.Data)), req)
        if respErr != nil {
            continue
        }
 
        reqMeta := payloadMeta(message.request.Meta)
        respMeta := payloadMeta(message.response.Meta)
 
        //Debug(4, "[OUTPUT][RESURFACE]", "Processing Message:", id)
        if Settings.Verbose > 4 {
            Debug(5, logPrefix, "Processing Request:", byteutils.SliceToString(reqMeta[1]))
            Debug(6, logPrefix, byteutils.SliceToString(message.request.Data))
            Debug(5, logPrefix, "Processing Response:", byteutils.SliceToString(respMeta[1]))
            Debug(6, logPrefix, byteutils.SliceToString(message.response.Data))
        }
 
        reqTimestamp, _ := strconv.ParseInt(byteutils.SliceToString(reqMeta[2]), 10, 64)
        respTimestamp, _ := strconv.ParseInt(byteutils.SliceToString(respMeta[2]), 10, 64)
 
        interval := (respTimestamp - reqTimestamp) / 1000000
        if interval < 0 {
            interval = 0
        }
 
        resurfaceLogger.SendHttpMessage(o.rLogger, resp, req, respTimestamp/1000000, interval, nil)
    }
}

说明

resurfaceio goreplay 的实现并不难,实际上我们也可以基于此实现一个graylog gelf 协议的

参考资料

https://resurface.io/docs
https://github.com/resurfaceio/goreplay
https://github.com/resurfaceio/logger-go

标签:SliceToString,logPrefix,byteutils,resurface,messageCounter,Debug,output,message,
From: https://www.cnblogs.com/rongfengliang/p/18060320

相关文章

  • kettle MongoDB Output 配置说明
    基本配置ConfigureConnectionTab数据库连接Connectiontimeout:尝试连接数据库所等待的最大时间(毫秒),空为无限,建议5000Sockettimeout:sql在执行成功之前等待读写操作的时间(毫秒),空为无限,建议5000OutputOptionsTab输入表与相关设置Truncateoption:在数据传输前清空表......
  • resurfaceio 参考架构
    resurfaceio是graylog开发的一个api安全解决方案,设计上使用了不少开源的东西,目前并不完全开源但是通过官方文档介绍,可以看到一些机制参考架构图简单说明:resurfaceio对于api安全的处理是通过三大方式解决的,第一中是基于sdk集成,写入请求信息到resurfaceioapi中第二种对......
  • resurfaceio goreplay output-s3 minio 兼容处理
    实际上此问题与以前版本goreplay对于s3的支持是一样的参考处理添加了新的环境变量AWS_FORCE_PATH_STYLE以及AWS_DISABLE_SSL,具体代码在rongfengliang/goreplay-new/blob/resurface/s3_reader.go中参考使用exportAWS_ACCESS_KEY_ID=minioexportAWS_SECRET_ACCE......
  • resurfaceio graylog 的api 安全方案
    resurfaceio是graylog的api安全方案,包含的特性特性简易的api调用捕捉立即攻击以及异常的rest以及graphqlapi处理基于webhook,sql查询,以及数据导出自动化处理快速部署本地或者基于k8s的云环境架构设计resurfaceio对于流量的处理基于了goreplay扩展参考网络流量......
  • resurfaceio gor linux 二进制包
    resurfaceio的gor是对于开源版本的修改(比如开启了企业特性,同时添加了对于resurfaceiohttpoutput的支持),为了方便对于linux系统的使用我拉取了官方镜像,将二进制文件放到github了https://github.com/rongfengliang/resurfaceio-gor支的命令 Gorisasimpleht......
  • kettle从入门到精通 第四十七课 ETL之kettle mongo output 写入
    1、上一节课我们学习了mongoinput读取步骤,本节课我们一起学习下mongoout写入步骤,该步骤可以将数据写入到mongo中,如下图所示。 2、 配置mongo连接,有两种方式,如截图所示。ConnectionString:如StringconnectionString="mongodb://username:password@localhost:27017/myda......
  • ffmpeg之avformat_alloc_output_context2
    函数原型:intavformat_alloc_output_context2(AVFormatContext**ctx,constAVOutputFormat*oformat,constchar*format_name,constchar*filename);功能:查找根据format_name或者filename或者oformat输出类型,并且初始化ctx结......
  • flink的分流器-sideoutput Flink 有两种常见的 State类型,分别是:Keyed State (键控状态
    flink的分流器-sideoutputFlink有两种常见的State类型,分别是:KeyedState(键控状态)和OperatorState(算子状态)为了说明侧输出(sideouptut)的作用,浪尖举个例子,比如现在有一篇文章吧,单词长度不一,但是我们想对单词长度小于5的单词进行wordcount操作,同时又想记录下来哪些单词的长度......
  • Java中的InputStream和OutputStream详解
    引言在Java编程中,处理输入输出是日常任务的一部分,而流(Stream)是实现输入输出的核心概念。在JavaI/OAPI中,InputStream和OutputStream是所有字节流类的基础。本文将详细介绍这两个类及其在Java中的应用。什么是InputStream和OutputStream?InputStream是JavaI/O库中的一个抽象类,它......
  • Shell - Pass output as argument to next command
     Inbashwecanpasstheoutputofonecommandtothenextoneasanargument.Wewillcovermultipleexamples.xargs isveryusefulforpassinginformationbetweencommandswhenchainingisusedinBash:echo-e"Python\nJava"|xargs-I{}e......