首页 > 编程语言 >ELK + Python 实现警报

ELK + Python 实现警报

时间:2023-12-19 17:06:05浏览次数:35  
标签:ELK .__ Python self urllib url 警报 import message

关于ELK 索引特定字段报警的特殊方式:ELK + python脚本。

FILEBEAT(日志采集)-->logstash(日志过滤)-->es(日志分类索引)-->kibana(展示)

邮件系统的日志比较复杂,包含众多的日志,比如用户登录日志、用户下载附件日志、垃圾邮件垃圾日志等采集大量的数据如何区分那些是有用的信息,比如异常登录IP信息、用户账号是否被盗用并转发大量的垃圾邮件信息等,对于运维人来说非常重要。本案例展示如何通过python脚本查询es信息并实现微信报警功能。

Logstash 代码

logstash针对收集到的日记进行处理,提取所需日志文件。

input{
    beats {
    port => 4562
     codec => json
  }
}

filter {
    if [type] == "antispamMail"{
        if "11500" in [message]{
            mutate {
                gsub => ["message","\t","|"]
                split => ["[message]","|"]
            }
        if [message][4]{
            mutate {
                split => ["[message][4]",";"]
                add_field => { 
                    "发送邮箱" => "%{[message][4][4]}"
                    "接受邮箱" => "%{[message][4][5]}"
                }
            }
        }

        if [message][4][0] {
            mutate {
                split => ["[message][4][0]",":"]
                add_field => { "发送IP" => "%{[message][4][0][4]}"}
            }
        }

        if [发送IP] !~ "^127.|^192.168.|^172.1[6-9].|^172.2[0-9].|^172.3[01].|^10." { 
                    geoip {
                      source  => "发送IP"
                      fields => ["country_name", "continent_code", "region_code", "city_name","location"]
                      target => "geoip"
                    }
         }
         mutate { remove_field => ["message"] }

        } else { drop{} }
    }
}

output{
    if [type] == "antispamMail" {
        elasticsearch {
          hosts => ["XXXX:9200","XXXXXX:9200","XXXXX:9200"]
          index => "antispammail-%{+YYYY.MM.dd}"
          codec => json
        }
      } 
}

Python 微信脚本

#!/usr/bin/env python3 
# -*- coding: utf-8 -*-

import urllib,json
import urllib.parse
import urllib.request
import sys


class WeChat(object):
    __token_id = ''
    def __init__(self,url):
        self.__url = url.rstrip('/')
        self.__corpid = 'XXXXX'
        self.__secret = 'XXXXXXXX'
	
    def authID(self):
        params = {'corpid':self.__corpid, 'corpsecret':self.__secret}
        data = urllib.parse.urlencode(params)
        content = self.getToken(data)
        try:
            self.__token_id = content['access_token']
        except KeyError:
            raise KeyError

    def getToken(self,data,url_prefix='/'):
        url = self.__url + url_prefix + 'gettoken?'
        try:
            response = urllib.request.Request(url + data)
        except KeyError:
            raise KeyError
        result = urllib.request.urlopen(response)
        content = json.loads(result.read())
        return content

    def postData(self,data,url_prefix='/'):
        url = self.__url + url_prefix + 'message/send?access_token=%s' %self.__token_id
        request = urllib.request.Request(url,data)
        try:
            result = urllib.request.urlopen(request)
        except urllib.request.HTTPError as e:
            if hasattr(e,'reason'):
                print ('reason',e.reason)
            elif hasattr(e,'code'):
                print ('code',e.code)
            return 0
        else:
            content = json.loads(result.read())
            result.close()
        return content

    def sendMessage(self,message):
        self.authID()
        data = json.dumps({
		'touser':"",
                'toparty':"9",
		'msgtype':"text",
		'agentid':"XXXXXXX",
		'text':{
			'content':message
			},
	        'safe':"0"
		},ensure_ascii=False)
        response = self.postData(data.encode('utf-8'))
        
def sendInfo():
    weixinInfo = WeChat('https://qyapi.weixin.qq.com/cgi-bin')
    return weixinInfo

ES查询脚本

** 定义5分钟查询一次

#!/usr/bin/env python3 
# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

query = {
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-5m"   
              }
            }
          }
        }

def get_es_info(index):
    temp = []
    es = Elasticsearch(host='XX.XX.XX.XX', port=9201)
    rel = scan(client=es            
            ,query=query                            
            ,index=index
            ,request_timeout=30
            ,raise_on_error=True
            ,preserve_order=False
            ,clear_scroll=True)
    
    result = list(rel)
    for hit in result:
        temp.append(hit['_source'])
    return temp

发送邮箱账号被盗用信息微信

#!/usr/bin/env python3 
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time,datetime
from sentInfo import * 
from collections import Counter
from esUtils import *

nowTime=time.strftime('%Y.%m.%d',time.localtime())


def get_spammail_info():
    mail_res = []
    for  i in  antiall:
        if ('XXXXXX' or'XXXXXX') in i['发送邮箱']:
            mail_res.append((i['发送邮箱'],i['发送IP']))
    return mail_res


def send_message():
    info = sendInfo()
    res  = get_spammail_info()
    for j in Counter(res).items():
        if j:
            info.sendMessage('\t盗用邮箱发送垃圾邮件信息\n邮箱地址:\t{0}\nIP地址:\t{1}\n邮件数量:\t{2}' \
                                    .format(j[0][0],j[0][1],j[1]))
                                                                     
if __name__ == "__main__":
    index = "antispammail-{0}".format(nowTime)
    antiall = get_es_info(index)
    send_message()

警报信息如下图

image.png

当然除了处理警报问题,还可以通过将非法IP自动添加到防护墙黑名单,发送邮件通知用户邮件账号被盗信息,基本可以实现半自动化,减少人工天天盯着kibana页面的烦恼。

标签:ELK,.__,Python,self,urllib,url,警报,import,message
From: https://blog.51cto.com/arckyli/8890904

相关文章

  • 深入解析 Python 中的对象创建与初始化:__new__ 与 __init__ 方法
    Python中的面向对象编程涉及许多特殊方法,其中__new__和__init__是两个关键的方法。它们分别负责对象的创建和对象的初始化,在对象的生命周期中扮演着不同而又互补的角色。让我们深入探讨这两个方法,了解它们的作用、区别以及如何在实际开发中应用。1. __new__方法当谈到Pyth......
  • 使用XPath进行网页爬取的Python实现
    XPath是一种用于在XML和HTML文档中进行导航和查询的语言。在网页爬取中,XPath可以帮助我们定位和提取特定的网页元素,从而实现数据的抓取和提取。本文将介绍如何使用Python中的XPath库来进行网页爬取。1.安装依赖库:在使用XPath进行网页爬取之前,我们需要安装相关的依赖库。Python中常......
  • Python中使用del删除列表元素的原理解析
    Python是一种功能强大的编程语言,提供了许多方便的操作列表的方法。其中,使用del关键字可以删除列表中的某个元素。本文将解析Python中使用del删除列表元素的原理,帮助您理解其工作原理和使用方法。1.列表是可变对象:在Python中,列表是一种可变对象,即可以在原地修改的对象。与不可变对象......
  • python-bytes型和string型的转换
    https://blog.csdn.net/weixin_43936250/article/details/124410127数据加解密时通常是以bytes形式存储,加解密算法运行前需要先对数据进行处理。以SM4算法示例数据为例,待加密数据为:0123456789abcdeffedcba9876543210,bytes类型则有两种表示方式message1=b'0123456789abcdeffed......
  • skywalking对接python
    1.官网:https://skywalking.apache.org/docs/skywalking-python/next/readme/2.安装pipinstall"apache-skywalking"3.集成到flask,启动服务fromflaskimportFlask,request,render_templatefromupload_file_to_s3importuploads3,get_md5fromskywalkingimp......
  • 腾讯云api-python调用
    https://cloud.tencent.com/document/product/1278/46716#-*-coding:utf-8-*-importhashlib,hmac,json,os,sys,timefromdatetimeimportdatetime#密钥参数#需要设置环境变量TENCENTCLOUD_SECRET_ID,值为示例的AKIDz8krbsJ5yK**********mLPx3EXAMPLEsecret_......
  • python 之 LDAP 用户统一认证登录
    pipinstallldap3#环境安装fromldap3importServer,Connection,SUBTREEldap_host='xx.xx.x.x'#ldap服务器地址ldap_port=389#默认389ldap_admin_user='xx'#ldap管理员账户用户名ldap_admin_password='xxx'#ldap管理员账户密码ldap_base_search......
  • Request+Python微博爬虫实战
    1Request爬虫基础Request爬虫基本步骤:1、构造URL;2、请求数据;3、解析数据;4、保存数据例:爬取豆瓣某图片importrequests#第1步:构造URLurl='https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2624516210.jpg'#第2步:请求数据r=requests.get(url)#第3步:解......
  • python_01_list_structure
    sort&&sortedsort作用于list,返回None,对list本身进行排序sorted作用于list,返回一个排序好的列表,原列表顺序不作处理;(PS:sorted作用于可迭代对象,都生成一个排序好的列表)>>>l=[1,2,3,5,6,7,6,5,4,3,2]>>>l.sort()>>>l[1,2,2,3,3,4,5,5,6,6,7]>>>k=[1,2,......
  • Python 潮流周刊第一季完结(1~30)
    你好,我是猫哥。庆祝Python潮流周刊在几天前顺利更新到了第30期!我觉得这是一个很有意义的时间节点,不太短也不漫长,很适合作一个小小的总结。我打算今后每30期作为一季,都给大家做一些总结和分享。首先,给大家公开一些数据吧。本季时间从2023.05.13到2023.12.09,共210天。曾有......