使用UDP协议实现简单的分布式日志服务, java和python
这几天系统出现问题, 需要查原因. 日志分散在各个服务器上, 查起来很要命.
网上百度了好久, 最后发现, 各种日志的处理方式都略显纷繁复杂了.
- 有的是syslog
- 有的是用 influxdb 实现日志存储和查询.
- 收费的则还有阿里云的SLS https://www.aliyun.com/product/sls/
- 有的是用工具链. ELK .其中 Elasticsearch 又是相当巨大的性能开销. 和各种安装配置. 为了一个简单的日志 , 实在是有点浪费资源.
在我看来,日志最主要的作用还是在出错的时候查找错误信息, 跟踪和分析系统性能. 对于分布式系统来讲又增加了一个 “集中” 日志管理的功能需求.方便查日志. 系统7-8个, 查日志一个文件一个文件的翻很慢的. 对于日志信息, 实际上并无太高要求.当查错误的时候, 能够方便的查询到信息即可. 对于日志的丢失和实时,以及查询性能都无太高要求.
如果要想基于日志系统进行系统业务逻辑层面得扩展, 那么我认为这种想法是不科学的. 毕竟日志是不严谨的. 而且日志的正则分析和匹配都是相当大的性能开销.
所以日志就让它回归到日志本身.
我的设计是, 各个分布式服务, 通过UDP协议(很简单易编程)发送到日志服务. . 日志服务器则直接保存到本地文件中.这个日志服务器只起到了收集日志的作用, 如果要想查询和检索, 则将日志文件通过ssh 复制到本地.然后用vscode进行搜索和查询也是非常方便的(现在谁的电脑上还没有个vscode?), 如果不用则留在服务器上不用管. 只是存在硬盘上. 也不会占用太多资源,
日志服务端代码
日志服务本身很简单. 几句话就可以实现.代码如下.
用的是python语言. java语言的还没实现, 有精力的小伙伴可以实现以下. 实现后联系我.分享一下.
import logging
from logging import handlers
import asyncio
import time
# from socket import *
import socket
class LogServer_UDP(object):
def __init__(self, host, port):
# PORT = 9000
ADDR = (host, port) #地址与端口
self.BUFSIZ = 4096 #接收数据缓冲大小
self.UDPServeSock=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建udp服务器套接字
self.UDPServeSock.bind(ADDR) #套接字与地址绑定-服务端特有
def run(self):
while True:
try:
while True:
msg, addr = self.UDPServeSock.recvfrom(self.BUFSIZ) #接收客户端发来的字节数组,data.decode()='char',data.upper()='bytes'
logging.info(addr[0] + ":"+str(addr[1])+"=>" + msg.decode("utf8"))
pass
except Exception as e:
time.sleep(1)
print(e)
self.UDPServeSock.close() #关闭服务端socket
def namer(filename):
return filename
def main_logger():
# 日志集中处理区,在主程序中调用一次
# handlers配置区,filter可选
# formatter = logging.Formatter("%(name)s - %(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s")
formatter = logging.Formatter("%(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# info日志处理器
# filename:日志文件名
# when:日志文件按什么维度切分。'S'-秒;'M'-分钟;'H'-小时;'D'-天;'W'-周
# 这里需要注意,如果选择 D-天,那么这个不是严格意义上的'天',而是从你
# 项目启动开始,过了24小时,才会从新创建一个新的日志文件,
# 如果项目重启,这个时间就会重置。所以这里选择'MIDNIGHT'-是指过了午夜
# 12点,就会创建新的日志。
# interval:是指等待多少个单位 when 的时间后,Logger会自动重建文件。
# backupCount:是保留日志个数。默认的0是不会自动删除掉日志。
logfile = handlers.TimedRotatingFileHandler(
'./log/udp.log',
when='D',
backupCount=10,
encoding='utf-8')
logfile.suffix = "%Y_%m_%d.log"
logfile.namer = namer
logfile.setLevel(logging.INFO)
logfile.setFormatter(formatter) # add formatter to ch
rootlog = logging.getLogger()
rootlog.setLevel(level=logging.INFO)
# rootlog.addHandler(log_file_handler)
# rootlog.addHandler(errorlog_file_handler)
rootlog.addHandler(console)
rootlog.addHandler(logfile)
# 设置监听的端口,并传递handlers
# loggerListener = ZMQListener("tcp://127.0.0.1:6666",*(ch,console))
# loggerListener.start() # 开启一个子线程处理记录器监听
# zq_listener = LogServer_UDP("127.0.0.1", 6666)
udp_listener = LogServer_UDP("0.0.0.0", 11385)
print("日志服务运行中,监听端口 11385")
udp_listener.run()
# 主进程调用一次,非阻塞
main_logger()
日志输出端 java语言
用的是logback的扩展.UDPLogAppender
在UDPLogAppender 中实现了数据上传到日志服务端的功能.
package com.qcd.DDD;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.Scanner;
@Slf4j
public class UDPLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
// Scanner input = new Scanner(System.in);
DatagramSocket socket;
DatagramPacket sendPacked;
// DatagramSocket datagramSocket = new DatagramSocket(); // 创建DatagramSocket
// DatagramPacket datagramPacket = new DatagramPacket(str.getBytes(),
// str.getBytes().length, InetAddress.getByName("127.0.0.1"), 1111); // 创建DatagramPacket(要发送的数据,数据的长度,Ip地址,端口)
// datagramSocket.send(datagramPacket); // 发送
// datagramSocket.close(); // 关闭
// ————————————————
@Override
public void start() {
super.start();
try {
this.socket = new DatagramSocket();
String hostaddr = InetAddress.getByName(this.host).getHostAddress();
sendPacked = new DatagramPacket(
new byte[2048], 2048,
new InetSocketAddress(hostaddr, this.port));
}
catch (UnknownHostException e){
e.printStackTrace();
}
catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// encoder.setContext(context);
encoder.start();
encoder.setCharset(Charset.forName("utf8"));
}
String host;
public void setHost(String host) {
this.host = host;
}
int port;
public void setPort(int port) {
this.port = port;
}
PatternLayoutEncoder encoder;
public PatternLayoutEncoder getEncoder() {
return encoder;
}
public void setEncoder(PatternLayoutEncoder encoder) {
this.encoder = encoder;
}
@Override
protected void append(ILoggingEvent eventObject)
{
byte[] bytemsg = this.encoder.encode(eventObject);
//必须先转成字符串, 再转成utf8编码发出去,
// 注释原因, 后来在上面加了个 encoder.setCharset(Charset.forName("utf8"));
//String msg = new String(bytemsg);
//String msg = eventObject.getFormattedMessage();
try {
// sendPacked.setData(msg.getBytes("utf8"));
sendPacked.setData(bytemsg);
socket.send(sendPacked); // 发送
// socket.close(); // 关闭
} catch (Exception e) {
e.printStackTrace();
}
}
}
logback-spring.xml 的配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 控制台 appender -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%n</pattern>
<!--charset>UTF-8</charset -->
</encoder>
</appender>
<appender name="udp" class="com.qcd.DDD.UDPLogAppender">
<encoder>
<pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%n</pattern>
</encoder>
<host>wuxi.ai.px82.com</host>
<port>11385</port>
</appender>
<!--控制台打印资源加载信息-->
<root level="info">
<appender-ref ref="udp"/>
<appender-ref ref="STDOUT"/>
<!-- <appender-ref ref="ERROR"/>
<appender-ref ref="WARN"/>
<appender-ref ref="INFO" /> -->
</root>
</configuration>
日志输出端, python语言,
用的是logging库.
LogConfig.py 文件内容如下
import logging
from logging.handlers import TimedRotatingFileHandler
from logging.handlers import RotatingFileHandler
import os
import colorlog
import traceback
import socket
import Config
log_colors_config = {
# 终端输出日志颜色配置
'DEBUG': 'white',
'INFO': 'cyan',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
class UDPHandler(logging.Handler):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.ADDR = (host, port)
# print(ADDR)
self.UDPCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建客户端套接字
def emit(self, record):
try:
msg = self.format(record)
# self.zq.send_string(msg, flags=zmq.NOBLOCK)
self.UDPCliSock.sendto(bytes(msg,'utf8'), self.ADDR)
except Exception:
traceback.print_exc()
# 滚动日志
def backrollLog():
def namer(filename):
return filename
LOG_FORMAT = "%(asctime)s %(levelname)-8s[%(name)s %(filename)s:%(lineno)d] %(message)s"
formatter = logging.Formatter(LOG_FORMAT)
color_format ='%(log_color)s%(asctime)s-%(name)s-%(filename)s:%(lineno)d-%(levelname)s-[msg]: %(message)s',
color_formater = colorlog.ColoredFormatter(
"%(asctime)s %(log_color)s%(levelname)-8s%(reset)s [%(name)s %(filename)s :%(lineno)d] %(message_log_color)s%(message)s",
secondary_log_colors={
'message': {
'ERROR': 'red',
'CRITICAL': 'red'
}
})
# 总开关
rootlog = logging.getLogger()
rootlog.setLevel(level=logging.INFO)
# pid = str(os.getpid())
#控制台输出INFO级别的信息
stream_handler = logging.StreamHandler() # 日志控制台输出
stream_handler.setFormatter(formatter)
stream_handler.setLevel(level=logging.INFO)
rootlog.addHandler(stream_handler)
IP = socket.gethostbyname(Config.Seting.LogHost)
udphandler = UDPHandler(IP, Config.Seting.LogPort)
udphandler.setFormatter(color_formater)
udphandler.setLevel(logging.INFO)
rootlog.addHandler(udphandler)
logging.error(" log error test ")
logging.info(" log info test ")
logging.warning(" log warning test ")
logging.debug(" log debug test ")
backrollLog()
python 语言要求在主程序中import 一次 上面的代码. …
正常使用
logtest.py
import Config
import LogConfig
import logging
import time
# LogConfig.backrollLog() , 不需要单独执行, import LogConfig的时候已经执行过
while(True):
logging.info("hahaha中文")
time.sleep(1)