resurfaceio goreplay output-resurface 的处理实际上就是开发了一个新的output 插件,对于数据的写入是使用了
resurfaceio 的golang logger sdk
实现简单说明
output_resurface.go,核心就是一个标准的goreplay plugin,通过go channel 实现消息处理,包装为一个自己的http message
之后通过 logger sdk 写入到resurfaceio api 服务中
- 消息处理
func (o *ResurfaceOutput) buildMessages() {
// Keep track of both requests and responses for matching
messages := make(map[string]*HTTPMessage, o.config.bufferSize)
// Manually check and remove orphaned requests/responses every 10 s
straysTicker := time.NewTicker(time.Second * 10)
// (Debug) Count messages received/sent
messageCounter := [3]int{0, 0, 0}
for {
select {
case msg := <-o.messages:
messageCounter[0]++
metaSlice := payloadMeta(msg.Meta)
// UUID shared by a given request and its corresponding response
messageID := byteutils.SliceToString(metaSlice[1])
message, messageFound := messages[messageID]
if !messageFound {
// Message timestamp
messageTimestamp := byteutils.SliceToString(metaSlice[2])
message = &HTTPMessage{}
messages[messageID] = message
messageTime, err := strconv.ParseInt(messageTimestamp, 10, 64)
if err == nil {
message.initTime = time.Unix(0, messageTime)
} else {
message.initTime = time.Now()
Debug(2, logPrefix, "Error parsing message timestamp", err.Error())
}
}
reqFound := message.request != nil
if !reqFound && isRequestPayload(msg.Meta) {
message.request = msg
reqFound = true
}
respFound := message.response != nil
if !respFound && !isRequestPayload(msg.Meta) {
message.response = msg
respFound = true
}
if reqFound && respFound {
o.httpMessages <- message
messageCounter[1]++
delete(messages, messageID)
}
case <-straysTicker.C:
if n := len(messages); n > 0 {
Debug(3, logPrefix, "Number of messages in queue:", n)
for id, message := range messages {
Debug(4, logPrefix, "Checking message:", id)
hasRequest := message.request != nil
hasResponse := message.response != nil
if ((hasRequest && !hasResponse) || (!hasRequest && hasResponse)) &&
time.Since(message.initTime) >= time.Minute*2 {
Debug(3, logPrefix, "STRAY MESSAGE:", id)
if Settings.Verbose > 3 {
if hasRequest {
Debug(4, logPrefix, "REQUEST:", byteutils.SliceToString(message.request.Meta))
Debug(5, logPrefix, "REQUEST:\n", byteutils.SliceToString(message.request.Data))
}
if hasResponse {
Debug(4, logPrefix, "RESPONSE:", byteutils.SliceToString(message.response.Meta))
Debug(5, logPrefix, "RESPONSE:\n", byteutils.SliceToString(message.response.Data))
}
}
delete(messages, id)
Debug(3, logPrefix, "MESSAGE", id, "DELETED")
messageCounter[2]++
}
}
}
if messageCounter[0]+messageCounter[1]+messageCounter[2] != 0 {
Debug(1, logPrefix, "messages received:", messageCounter[0],
", full messages sent:", messageCounter[1], ", orphans deleted:", messageCounter[2])
messageCounter = [3]int{0, 0, 0}
}
}
}
}
- 数据写入处理
func (o *ResurfaceOutput) sendMessages() {
for message := range o.httpMessages {
req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(message.request.Data)))
if reqErr != nil {
continue
}
resp, respErr := http.ReadResponse(bufio.NewReader(bytes.NewReader(message.response.Data)), req)
if respErr != nil {
continue
}
reqMeta := payloadMeta(message.request.Meta)
respMeta := payloadMeta(message.response.Meta)
//Debug(4, "[OUTPUT][RESURFACE]", "Processing Message:", id)
if Settings.Verbose > 4 {
Debug(5, logPrefix, "Processing Request:", byteutils.SliceToString(reqMeta[1]))
Debug(6, logPrefix, byteutils.SliceToString(message.request.Data))
Debug(5, logPrefix, "Processing Response:", byteutils.SliceToString(respMeta[1]))
Debug(6, logPrefix, byteutils.SliceToString(message.response.Data))
}
reqTimestamp, _ := strconv.ParseInt(byteutils.SliceToString(reqMeta[2]), 10, 64)
respTimestamp, _ := strconv.ParseInt(byteutils.SliceToString(respMeta[2]), 10, 64)
interval := (respTimestamp - reqTimestamp) / 1000000
if interval < 0 {
interval = 0
}
resurfaceLogger.SendHttpMessage(o.rLogger, resp, req, respTimestamp/1000000, interval, nil)
}
}
说明
resurfaceio goreplay 的实现并不难,实际上我们也可以基于此实现一个graylog gelf 协议的
参考资料
https://resurface.io/docs
https://github.com/resurfaceio/goreplay
https://github.com/resurfaceio/logger-go