关于ELK 索引特定字段报警的特殊方式:ELK + python脚本。
FILEBEAT(日志采集)-->logstash(日志过滤)-->es(日志分类索引)-->kibana(展示)
邮件系统的日志比较复杂,包含众多的日志,比如用户登录日志、用户下载附件日志、垃圾邮件垃圾日志等采集大量的数据如何区分那些是有用的信息,比如异常登录IP信息、用户账号是否被盗用并转发大量的垃圾邮件信息等,对于运维人来说非常重要。本案例展示如何通过python脚本查询es信息并实现微信报警功能。
Logstash 代码
logstash针对收集到的日记进行处理,提取所需日志文件。
input{
beats {
port => 4562
codec => json
}
}
filter {
if [type] == "antispamMail"{
if "11500" in [message]{
mutate {
gsub => ["message","\t","|"]
split => ["[message]","|"]
}
if [message][4]{
mutate {
split => ["[message][4]",";"]
add_field => {
"发送邮箱" => "%{[message][4][4]}"
"接受邮箱" => "%{[message][4][5]}"
}
}
}
if [message][4][0] {
mutate {
split => ["[message][4][0]",":"]
add_field => { "发送IP" => "%{[message][4][0][4]}"}
}
}
if [发送IP] !~ "^127.|^192.168.|^172.1[6-9].|^172.2[0-9].|^172.3[01].|^10." {
geoip {
source => "发送IP"
fields => ["country_name", "continent_code", "region_code", "city_name","location"]
target => "geoip"
}
}
mutate { remove_field => ["message"] }
} else { drop{} }
}
}
output{
if [type] == "antispamMail" {
elasticsearch {
hosts => ["XXXX:9200","XXXXXX:9200","XXXXX:9200"]
index => "antispammail-%{+YYYY.MM.dd}"
codec => json
}
}
}
Python 微信脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import urllib,json
import urllib.parse
import urllib.request
import sys
class WeChat(object):
__token_id = ''
def __init__(self,url):
self.__url = url.rstrip('/')
self.__corpid = 'XXXXX'
self.__secret = 'XXXXXXXX'
def authID(self):
params = {'corpid':self.__corpid, 'corpsecret':self.__secret}
data = urllib.parse.urlencode(params)
content = self.getToken(data)
try:
self.__token_id = content['access_token']
except KeyError:
raise KeyError
def getToken(self,data,url_prefix='/'):
url = self.__url + url_prefix + 'gettoken?'
try:
response = urllib.request.Request(url + data)
except KeyError:
raise KeyError
result = urllib.request.urlopen(response)
content = json.loads(result.read())
return content
def postData(self,data,url_prefix='/'):
url = self.__url + url_prefix + 'message/send?access_token=%s' %self.__token_id
request = urllib.request.Request(url,data)
try:
result = urllib.request.urlopen(request)
except urllib.request.HTTPError as e:
if hasattr(e,'reason'):
print ('reason',e.reason)
elif hasattr(e,'code'):
print ('code',e.code)
return 0
else:
content = json.loads(result.read())
result.close()
return content
def sendMessage(self,message):
self.authID()
data = json.dumps({
'touser':"",
'toparty':"9",
'msgtype':"text",
'agentid':"XXXXXXX",
'text':{
'content':message
},
'safe':"0"
},ensure_ascii=False)
response = self.postData(data.encode('utf-8'))
def sendInfo():
weixinInfo = WeChat('https://qyapi.weixin.qq.com/cgi-bin')
return weixinInfo
ES查询脚本
** 定义5分钟查询一次
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
query = {
"query": {
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
}
}
def get_es_info(index):
temp = []
es = Elasticsearch(host='XX.XX.XX.XX', port=9201)
rel = scan(client=es
,query=query
,index=index
,request_timeout=30
,raise_on_error=True
,preserve_order=False
,clear_scroll=True)
result = list(rel)
for hit in result:
temp.append(hit['_source'])
return temp
发送邮箱账号被盗用信息微信
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time,datetime
from sentInfo import *
from collections import Counter
from esUtils import *
nowTime=time.strftime('%Y.%m.%d',time.localtime())
def get_spammail_info():
mail_res = []
for i in antiall:
if ('XXXXXX' or'XXXXXX') in i['发送邮箱']:
mail_res.append((i['发送邮箱'],i['发送IP']))
return mail_res
def send_message():
info = sendInfo()
res = get_spammail_info()
for j in Counter(res).items():
if j:
info.sendMessage('\t盗用邮箱发送垃圾邮件信息\n邮箱地址:\t{0}\nIP地址:\t{1}\n邮件数量:\t{2}' \
.format(j[0][0],j[0][1],j[1]))
if __name__ == "__main__":
index = "antispammail-{0}".format(nowTime)
antiall = get_es_info(index)
send_message()
警报信息如下图
当然除了处理警报问题,还可以通过将非法IP自动添加到防护墙黑名单,发送邮件通知用户邮件账号被盗信息,基本可以实现半自动化,减少人工天天盯着kibana页面的烦恼。
标签:ELK,.__,Python,self,urllib,url,警报,import,message From: https://blog.51cto.com/arckyli/8890904