首页 > 编程语言 >使用UDP协议实现简单的分布式日志服务, java和python

使用UDP协议实现简单的分布式日志服务, java和python

时间:2022-11-21 11:33:04浏览次数:50  
标签:UDP java socket python self logging import 日志


使用UDP协议实现简单的分布式日志服务, java和python

这几天系统出现问题, 需要查原因. 日志分散在各个服务器上, 查起来很要命.
网上百度了好久, 最后发现, 各种日志的处理方式都略显纷繁复杂了.

  1. 有的是syslog
  2. 有的是用 influxdb 实现日志存储和查询.
  3. 收费的则还有阿里云的SLS https://www.aliyun.com/product/sls/
  4. 有的是用工具链. ELK .其中 Elasticsearch 又是相当巨大的性能开销. 和各种安装配置. 为了一个简单的日志 , 实在是有点浪费资源.

在我看来,日志最主要的作用还是在出错的时候查找错误信息, 跟踪和分析系统性能. 对于分布式系统来讲又增加了一个 “集中” 日志管理的功能需求.方便查日志. 系统7-8个, 查日志一个文件一个文件的翻很慢的. 对于日志信息, 实际上并无太高要求.当查错误的时候, 能够方便的查询到信息即可. 对于日志的丢失和实时,以及查询性能都无太高要求.

如果要想基于日志系统进行系统业务逻辑层面得扩展, 那么我认为这种想法是不科学的. 毕竟日志是不严谨的. 而且日志的正则分析和匹配都是相当大的性能开销.

所以日志就让它回归到日志本身.

我的设计是, 各个分布式服务, 通过UDP协议(很简单易编程)发送到日志服务. . 日志服务器则直接保存到本地文件中.这个日志服务器只起到了收集日志的作用, 如果要想查询和检索, 则将日志文件通过ssh 复制到本地.然后用vscode进行搜索和查询也是非常方便的(现在谁的电脑上还没有个vscode?), 如果不用则留在服务器上不用管. 只是存在硬盘上. 也不会占用太多资源,

日志服务端代码

日志服务本身很简单. 几句话就可以实现.代码如下.
用的是python语言. java语言的还没实现, 有精力的小伙伴可以实现以下. 实现后联系我.分享一下.

import logging
from logging import handlers
import asyncio
import time

# from socket import *

import socket


class LogServer_UDP(object):
def __init__(self, host, port):
# PORT = 9000
ADDR = (host, port) #地址与端口
self.BUFSIZ = 4096 #接收数据缓冲大小
self.UDPServeSock=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建udp服务器套接字
self.UDPServeSock.bind(ADDR) #套接字与地址绑定-服务端特有

def run(self):
while True:
try:
while True:
msg, addr = self.UDPServeSock.recvfrom(self.BUFSIZ) #接收客户端发来的字节数组,data.decode()='char',data.upper()='bytes'
logging.info(addr[0] + ":"+str(addr[1])+"=>" + msg.decode("utf8"))
pass

except Exception as e:
time.sleep(1)
print(e)

self.UDPServeSock.close() #关闭服务端socket

def namer(filename):
return filename

def main_logger():
# 日志集中处理区,在主程序中调用一次

# handlers配置区,filter可选
# formatter = logging.Formatter("%(name)s - %(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s")
formatter = logging.Formatter("%(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)

# info日志处理器
# filename:日志文件名
# when:日志文件按什么维度切分。'S'-秒;'M'-分钟;'H'-小时;'D'-天;'W'-周
# 这里需要注意,如果选择 D-天,那么这个不是严格意义上的'天',而是从你
# 项目启动开始,过了24小时,才会从新创建一个新的日志文件,
# 如果项目重启,这个时间就会重置。所以这里选择'MIDNIGHT'-是指过了午夜
# 12点,就会创建新的日志。
# interval:是指等待多少个单位 when 的时间后,Logger会自动重建文件。
# backupCount:是保留日志个数。默认的0是不会自动删除掉日志。
logfile = handlers.TimedRotatingFileHandler(
'./log/udp.log',
when='D',
backupCount=10,
encoding='utf-8')
logfile.suffix = "%Y_%m_%d.log"
logfile.namer = namer
logfile.setLevel(logging.INFO)
logfile.setFormatter(formatter) # add formatter to ch


rootlog = logging.getLogger()
rootlog.setLevel(level=logging.INFO)
# rootlog.addHandler(log_file_handler)
# rootlog.addHandler(errorlog_file_handler)
rootlog.addHandler(console)
rootlog.addHandler(logfile)

# 设置监听的端口,并传递handlers
# loggerListener = ZMQListener("tcp://127.0.0.1:6666",*(ch,console))

# loggerListener.start() # 开启一个子线程处理记录器监听
# zq_listener = LogServer_UDP("127.0.0.1", 6666)
udp_listener = LogServer_UDP("0.0.0.0", 11385)
print("日志服务运行中,监听端口 11385")
udp_listener.run()


# 主进程调用一次,非阻塞
main_logger()

日志输出端 java语言

用的是logback的扩展.UDPLogAppender
在UDPLogAppender 中实现了数据上传到日志服务端的功能.

package com.qcd.DDD;

import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.Scanner;


@Slf4j
public class UDPLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

// Scanner input = new Scanner(System.in);
DatagramSocket socket;
DatagramPacket sendPacked;

// DatagramSocket datagramSocket = new DatagramSocket(); // 创建DatagramSocket
// DatagramPacket datagramPacket = new DatagramPacket(str.getBytes(),
// str.getBytes().length, InetAddress.getByName("127.0.0.1"), 1111); // 创建DatagramPacket(要发送的数据,数据的长度,Ip地址,端口)
// datagramSocket.send(datagramPacket); // 发送
// datagramSocket.close(); // 关闭
// ————————————————


@Override
public void start() {
super.start();
try {
this.socket = new DatagramSocket();
String hostaddr = InetAddress.getByName(this.host).getHostAddress();

sendPacked = new DatagramPacket(
new byte[2048], 2048,
new InetSocketAddress(hostaddr, this.port));
}
catch (UnknownHostException e){
e.printStackTrace();
}
catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

// encoder.setContext(context);
encoder.start();
encoder.setCharset(Charset.forName("utf8"));
}

String host;

public void setHost(String host) {
this.host = host;
}

int port;

public void setPort(int port) {
this.port = port;
}


PatternLayoutEncoder encoder;

public PatternLayoutEncoder getEncoder() {
return encoder;
}

public void setEncoder(PatternLayoutEncoder encoder) {
this.encoder = encoder;
}

@Override
protected void append(ILoggingEvent eventObject)
{
byte[] bytemsg = this.encoder.encode(eventObject);

//必须先转成字符串, 再转成utf8编码发出去,
// 注释原因, 后来在上面加了个 encoder.setCharset(Charset.forName("utf8"));
//String msg = new String(bytemsg);
//String msg = eventObject.getFormattedMessage();
try {
// sendPacked.setData(msg.getBytes("utf8"));
sendPacked.setData(bytemsg);
socket.send(sendPacked); // 发送
// socket.close(); // 关闭
} catch (Exception e) {
e.printStackTrace();
}
}

}

logback-spring.xml 的配置

<?xml version="1.0" encoding="UTF-8"?>  
<configuration>

<!-- 控制台 appender -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%n</pattern>
<!--charset>UTF-8</charset -->
</encoder>
</appender>

<appender name="udp" class="com.qcd.DDD.UDPLogAppender">
<encoder>
<pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%n</pattern>
</encoder>
<host>wuxi.ai.px82.com</host>
<port>11385</port>
</appender>

<!--控制台打印资源加载信息-->
<root level="info">
<appender-ref ref="udp"/>
<appender-ref ref="STDOUT"/>
<!-- <appender-ref ref="ERROR"/>
<appender-ref ref="WARN"/>
<appender-ref ref="INFO" /> -->
</root>
</configuration>

日志输出端, python语言,

用的是logging库.
LogConfig.py 文件内容如下

import logging
from logging.handlers import TimedRotatingFileHandler
from logging.handlers import RotatingFileHandler
import os
import colorlog
import traceback
import socket
import Config

log_colors_config = {
# 终端输出日志颜色配置
'DEBUG': 'white',
'INFO': 'cyan',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}


class UDPHandler(logging.Handler):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.ADDR = (host, port)
# print(ADDR)
self.UDPCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建客户端套接字

def emit(self, record):
try:
msg = self.format(record)
# self.zq.send_string(msg, flags=zmq.NOBLOCK)
self.UDPCliSock.sendto(bytes(msg,'utf8'), self.ADDR)
except Exception:
traceback.print_exc()


# 滚动日志
def backrollLog():
def namer(filename):
return filename

LOG_FORMAT = "%(asctime)s %(levelname)-8s[%(name)s %(filename)s:%(lineno)d] %(message)s"
formatter = logging.Formatter(LOG_FORMAT)

color_format ='%(log_color)s%(asctime)s-%(name)s-%(filename)s:%(lineno)d-%(levelname)s-[msg]: %(message)s',

color_formater = colorlog.ColoredFormatter(
"%(asctime)s %(log_color)s%(levelname)-8s%(reset)s [%(name)s %(filename)s :%(lineno)d] %(message_log_color)s%(message)s",
secondary_log_colors={
'message': {
'ERROR': 'red',
'CRITICAL': 'red'
}
})




# 总开关
rootlog = logging.getLogger()
rootlog.setLevel(level=logging.INFO)

# pid = str(os.getpid())
#控制台输出INFO级别的信息
stream_handler = logging.StreamHandler() # 日志控制台输出
stream_handler.setFormatter(formatter)
stream_handler.setLevel(level=logging.INFO)
rootlog.addHandler(stream_handler)


IP = socket.gethostbyname(Config.Seting.LogHost)
udphandler = UDPHandler(IP, Config.Seting.LogPort)
udphandler.setFormatter(color_formater)
udphandler.setLevel(logging.INFO)
rootlog.addHandler(udphandler)

logging.error(" log error test ")
logging.info(" log info test ")
logging.warning(" log warning test ")
logging.debug(" log debug test ")




backrollLog()

python 语言要求在主程序中import 一次 上面的代码. …
正常使用

logtest.py

import Config
import LogConfig
import logging
import time


# LogConfig.backrollLog() , 不需要单独执行, import LogConfig的时候已经执行过

while(True):
logging.info("hahaha中文")
time.sleep(1)


标签:UDP,java,socket,python,self,logging,import,日志
From: https://blog.51cto.com/u_15353042/5873189

相关文章

  • Java 同步锁ReentrantLock与抽象同步队列AQS
    AbstractQueuedSynchronizer抽象同步队列,它是个模板类提供了许多以锁相关的操作,常说的AQS指的就是它。AQS继承了​​AbstractOwnableSynchronizer​​类,AOS用于保存线程对......
  • python代码规范工具
    文章目录​​一:Pycharm自动创建文件头部​​​​二:代码门禁​​​​三:CommitAngular规范​​一:Pycharm自动创建文件头部Pycham—>Preferences—>编辑器—>文件和代......
  • Java 同步锁ReentrantLock与抽象同步队列AQS
    AbstractQueuedSynchronizer抽象同步队列,它是个模板类提供了许多以锁相关的操作,常说的AQS指的就是它。AQS继承了​​AbstractOwnableSynchronizer​​类,AOS用于保存线程对......
  • 【Java】JDK5.0新增的创建多线程的方式:实现Callable接口,使用线程池
    1.实现Callable接口方式和实现Runnable接口相比call()可以有返回值。call()可以抛出异常,被外面的操作捕获,获取异常信息。Callable是支持泛型的。实现Callable接口......
  • Java工具库Guava本地缓存Cache的使用、回收、刷新、统计等示例
    场景Java核心工具库Guava介绍以及Optional和Preconditions使用进行非空和数据校验:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/127683387在上面引入Gua......
  • 轻松学会JavaScript事件
    文章目录​​事件与事件流​​​​事件监听(绑定事件方法)​​​​JavaScript事件​​​​鼠标事件​​​​表单事件​​​​键盘事件​​​​UI事件​​​​快速投票​​  ......
  • PyCharm、Python、Django安装以及环境变量配置
    开发Python有很多IDE工具,我选择了PyCharm工具,因为风格和AndroidStudio极其的相似,对于我来说这个工具很容易上手。一、PyCharmPyCharm官网下载地址:http://www.jetbrains.com/......
  • python - 模块1
    模块相当于库,有标准库和第三方库模块名不要跟库名一致print(sys.path)#打印环境变量print(sys.argv)#打印相对路径os模块#跟系统交互os.system("")#跟系统交......
  • 木马免杀代码篇之python反序列化分离免杀(一)
    前言本篇文章主要用到python来对CobaltStrike生成的Shellcode进行分离免杀处理,因此要求读者要有一定的python基础,下面我会介绍pyhon反序列化免杀所需用到的相关函数和......
  • python画动态爱心
    importrandomfrommathimportsin,cos,pi,logfromtkinterimport*CANVAS_WIDTH=640#画布的宽CANVAS_HEIGHT=480#画布的高CANVAS_CENTER_X=CANVA......