首页 > 其他分享 >基于WAF+Splunk+FW简单SOAR平台搭建

基于WAF+Splunk+FW简单SOAR平台搭建

时间:2024-07-16 11:52:11浏览次数:17  
标签:log FW WAF cmdlist file Splunk path os

基于WAF+Splunk+FW简单SOAR平台搭建

目录

1 基于WAF+Splunk简单SOC平台搭建

1.1 准备环境

  • Splunk安装
  • WAF设备
  • WAF到Splunk的syslog请自行打通

1.2 配置Splunk

WAF的日志发送请依据不同厂商进行配置。

1.2.1 配置数据源(从网络端口获取)

当添加新的网络端口输入时,也就是在后台向input.conf文件添加新的输入配置节点,Splunk服务器可包含一个或多个input.conf文件,文件位于$SPLUNK_HOME/etc/system/local,或Splunk应用程序的local目录。

  • 选择:"设置"--> 数据分类:"来源类型" --> "新建来源类型" --> 配置如下

    image-20240715171910884

  • 选择:"设置"--> "数据输入" --> "UDP" --> 配置如下

    image-20240715172028789

    image-20240715172209576

1.2.2 创建应用

Splunk 应用程序(或APP)可被看作是专门用于解决某种用例的工作区。

  • 选择:"应用"--> "管理应用" --> "新建应用" --> 配置如下

    image-20240715165530149

1.2.3 创建警报

Splunk 的警报由底层搜索提供支持。这些底层搜索按计划执行历史索引的数据或在数据实时导入Splunk时实时进行。警报可以在搜索每次执行的时候触发,也可以在搜索结果符合某些条件时触发。

image-20240715173149842

image-20240715173210239

  • 选择:"应用"--> "SecOps" --> "搜索",配置如下:

    index="index_netsec" "Traffic@waf-access" | rex field=_raw "客户IP:(?<src_ip>\d+\.\d+\.\d+\.\d+).*站点名称:(?<site>.*) 协议.*资源路径:(?<site_path>.*) 查询字符串.*(?<evial_tool>(jndi|nmap))" | stats dc(jndi) AS count_01,dc(site_path) as count_02 by src_ip | where count_01>0 Or count_02>70
    
    • index="index_netsec"Splunk 中所有数据都保存在一个或多个索引中。建议在搜索时指定索引,能得到更精确搜索结果。
    • "Traffic@waf-access":搜索与"Traffic@waf-access"有关的数据。
    • rex field=_raw "客户IP:(?<src_ip>\d+\.\d+\.\d+\.\d+).*站点名称:(?<site>.*) 协议.*资源路径:(?<site_path>.*) 查询字符串.*(?<jndi>(jndi|nmap))" : 创建自定义字段
    • stats dc(jndi) AS count_01,dc(site_path) as count_02 by src_ipstats命令将上一步的搜索结果放在管道左侧,并告知Splunk计算每个evial_tool,site_path的数量。
    • where count_01>0 Or count_02>70:找出访问User-Agent中包含jndi,nmap与访问路径数量超过70个的IP。
  • 回车或点击搜索图标后,点击""另存为"-->"告警"-->配置如下:添加触发的告警根据需要选择优先级,必选,否则不生效。

    image-20240715174422822

1.2.4 创建警报触发动作

  • $SPLUNK_HOME\bin\scripts目录创建runBanDirSearch.bat

    @echo off
    cd "C:\Program Files\Splunk\bin\scripts\"
    BanDirSearch.exe
    cmd
    

1.2.5 建议封禁的策略

  • 发现Web服务探测行为
  • Generic_scan
  • 发现黑客工具Nmap扫描行为
  • 发现扫描工具-zgrab
  • 发现NMAP探测行为(SSL)
  • Apache Log4j2 远程代码执行漏洞(CVE-2021-44228/CVE-2021-45046)
  • 敏感信息扫描(机器学习)
  • 发现黑客工具Masscan扫描行为
  • 发现黑客工具-绿盟扫描器扫描行为
  • 发现黑客扫描工具-httpx
  • 发现黑客工具Nessus扫描行为

2 基于FW+Splunk警报实现自动化运营

2.1 编写自动划封禁脚本

from splunklib import client
import splunklib.results as results

import csv
import sys
import os
import zipfile

import chardet
import paramiko
import time
import threading
import logging
from logging import handlers

import sqlite3


def mk_log_dir():
    """
    生成日志文件夹
    :return:
    """
    date_time = time.strftime('%Y%m%d', time.localtime())
    log_root_dir = 'BanIP_data_' + date_time

    if not os.path.exists(log_root_dir):  # 判断目标目录是否存在
        os.mkdir(log_root_dir)  # 如果不存在则创建目标目录
        print("已创建数据存放根目录:%s" % log_root_dir)
    log_time = time.strftime('%Y%m%d%H%M%S', time.localtime())
    log_dir = os.path.join(os.getcwd(), log_root_dir, '{}'.format('LogData_' + log_time))
    if not os.path.exists(log_dir):  # 判断目标目录是否存在
        os.mkdir(log_dir)  # 如果不存在则创建目标目录
        print("已创建数据存放目录:%s" % log_dir)
    return log_root_dir, log_dir


class Logger(object):
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }  # 日志级别关系映射

    def __init__(self, filename, level='info', when='D', back_count=3,
                 fmt='%(asctime)s - %(levelname)s: %(message)s'):
        self.logger = logging.getLogger(filename)
        format_str = logging.Formatter(fmt)  # 设置日志格式
        self.logger.setLevel(self.level_relations.get(level))  # 设置日志级别
        sh = logging.StreamHandler()  # 往屏幕上输出
        sh.setFormatter(format_str)  # 设置屏幕上显示的格式
        th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=back_count,
                                               encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器
        # 实例化TimedRotatingFileHandler
        # filename是日志名
        # when是间隔时间单位
        # backupCount是要保留几个分割后的日志文件
        # interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
        # S 秒
        # M 分
        # H 小时、
        # D 天、
        # W 每星期(interval==0时代表星期一)
        # midnight 每天凌晨
        th.setFormatter(format_str)  # 设置文件里写入的格式
        self.logger.addHandler(sh)  # 把对象加到logger里
        self.logger.addHandler(th)


log = Logger('BanIP.log', level='debug')


def check_code(file):
    """
    检查文件所属编码
    :param file:文件名称
    :return:文件所属编码
    """
    file_text = open(file, 'rb').readline()
    file_code = chardet.detect(file_text)['encoding']
    # 由于windows系统的编码有可能是Windows-1254,打印出来后还是乱码,所以不直接用adchar['encoding']编码
    if file_code == 'gbk' or file_code == 'GBK' or file_code == 'GB2312':
        file_code = 'GB2312'
    else:
        file_code = 'utf-8'
    return file_code


def get_all_file_path(dirpath, filepathlist):
    for filename in os.listdir(dirpath):
        if os.path.isfile(os.path.join(dirpath, filename)):
            filepath = os.path.join(dirpath, filename)
            filepathlist.append(filepath)
        elif os.path.isdir(os.path.join(dirpath, filename)):
            get_all_file_path(os.path.join(dirpath, filename), filepathlist)
        else:
            print("跳过文档: ", filename)


def Zip_File(ZIPFILE, ZipFileDir):
    LogZip = zipfile.ZipFile(ZIPFILE, 'w', zipfile.ZIP_DEFLATED)  # 压缩日志
    logfilelist = []
    get_all_file_path(dirpath=ZipFileDir, filepathlist=logfilelist)
    for file in logfilelist:
        LogZip.write(file)
    # os.remove(ZipFileDir + '\\' + filename)  # 压缩完成后删除文件,以便后续删除原始目录
    LogZip.close()
    # os.removedirs(ZipFileDir)  # 压缩成功后,删除原始目录
    return ZIPFILE


def ssh_login(hostname, host, user, passwd, port=22):
    """
    实例化ssh登陆
    :return:
    """
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
    try:
        ssh_client.connect(hostname=host, username=user, password=passwd, port=port, timeout=10,
                           look_for_keys=False, allow_agent=False)
    except (TimeoutError, Exception) as e:
        message = '{}\t无法访问,登录超时。'.format(hostname) + str(e)
        log.logger.error(message)
        pool_sema.release()  # 解锁
        sys.exit(1)
    else:
        message = '{}\t登录成功。'.format(hostname)
        log.logger.warning(message)
        return ssh_client


def process_ssh_patrol_data(host, session, cmds: list, savefile):
    """
    :param session: 实例化的ssh
    :param cmds: 命令列表
    :param savefile: 需要保存的文件名称
    :return:
    """
    # invoke_shell()函数类似shell终端,可以将执行结果分批次返回,看到任务的执行情况,不会因为执行一个很长的脚本而不知道是否执行成功
    shell = session.invoke_shell()
    for patrol_cmd in cmds:
        print('正在处理{0}: {1}'.format(host, patrol_cmd))
        shell.send(patrol_cmd + '\n')
        if "show" or "display " in patrol_cmd.lower():
            time.sleep(1)
        # 确认是否还有数据需要返回
        recv_flag = shell.recv_ready()
        while recv_flag:
            result = shell.recv(65535).decode(encoding='utf-8', errors='ignore')
            savefile.write(result)
            savefile.flush()
            # time.sleep(0.5)
            recv_flag = shell.recv_ready()
    shell.close()


def get_cmdlist_from_splunk(conn, cursor):
    # 配置Splunk服务器信息
    HOST = "Splunk服务器hostIP"
    PORT = 8089
    USERNAME = "Splunk-Username"
    PASSWORD = "Splunk-Password"

    # 创建Service实例
    service = client.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD
    )

    # 设置查询语句
    # 查询5分钟前到此刻的数据
    kwargs_oneshot = {"latest": "-5m"}
    # 设置查询语句
    searchquery_oneshot = """search index="index_netsec" "Traffic@waf-access" \
    | rex field=_raw "客户IP:(?<src_ip>\d+\.\d+\.\d+\.\d+).*站点名称:(?<site>.*) 协议.*资源路径:(?<site_path>.*) 查询字符串.*(?<jndi>(jndi|nmap))" \
    | stats dc(jndi) AS count_01,dc(site_path) as count_02 by src_ip \
    | where count_01>0 Or count_02>70"""
    # 运行并保存运行结果,以Json格式输出数据
    oneshotsearch_results = service.jobs.oneshot(searchquery_oneshot, **kwargs_oneshot, output_mode='json')
    # 使用JSONResultsReader方法获取数据
    reader = results.JSONResultsReader(oneshotsearch_results)
    iplist = []
    for result in reader:
        if isinstance(result, results.Message):
            # 调试数据使用会话形式展示
            log.logger.info(f'{result.type}: {result.message}')
        elif isinstance(result, dict):
            # Normal events are returned as dicts
            banDate = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            sql_search = cursor.execute("SELECT IP FROM BanIP WHERE IP = '%s';" % result['src_ip'].strip()).fetchone()
            if sql_search is None:
                sql_insert = "INSERT INTO BanIP (IP,Ban_date) select '{0}','{1}';".format(result['src_ip'].strip(),
                                                                                          banDate)
                cursor.execute(sql_insert)
                conn.commit()
                iplist.append('ip {}/32'.format(result['src_ip'].strip()))
                log.logger.info(f'获取如下更新数据:\n\t' + '\n\t'.join(ip for ip in iplist))
    conn.close()
    return iplist


def run(cmdlist, hostIP, hostname, port=22):
    """
    获取返回数据
    :return:
    """
    user = 'FW-Username'
    passwd = 'FW-Password'
    session = ssh_login(hostname, hostIP, user, passwd, port)
    # 命令信息采集
    with open(logDir + '\\' + hostIP + '_' + hostname + '.txt', 'w', encoding='utf-8',
              newline='') as patrol_data:
        process_ssh_patrol_data(host=hostIP, session=session, cmds=cmdlist, savefile=patrol_data)
        log.logger.info('%s 命令执行完成' % hostIP)
    session.close()
    pool_sema.release()  # 解锁


if __name__ == '__main__':

    # user = input('请输入用户名:')
    # passwd = getpass.getpass('请输入密码:')
    # while True:
    #     try:
    #         thread_num = int(input('同时处理的设备数量(默认5台):') or 5)
    #     except:
    #         print('请输入数字后回车!')
    #         continue
    conn = sqlite3.connect('BanIP.db')
    cursor = conn.cursor()
    # 数据库创建表可以自动执行
    cursor.execute("""CREATE TABLE IF NOT EXISTS BanIP (
            ID INTEGER PRIMARY KEY AUTOINCREMENT,
            IP VARCHAR(20),
            Ban_date VARCHAR(20))""")

    cmdlist = get_cmdlist_from_splunk(conn, cursor)
    if len(cmdlist) == 0:
        log.logger.info('无更新数据,程序退出。')
        sys.exit(0)
    else:
        cmdlist.append('show config address "Addr_HWBlackIP_01"')
        cmdlist.append('save')
        cmdlist.append('y')
        cmdlist.append('\n')
        cmdlist.insert(0, 'terminal length 0')
        cmdlist.insert(1, 'terminal width 512')
        cmdlist.insert(2, 'config')
        cmdlist.insert(3, 'address "Addr_HWBlackIP_01"')
        thread_num = 5
        device_file = r'.\device_list.csv'
        threads_list = []
        logRootDir, logDir = mk_log_dir()
        pool_sema = threading.BoundedSemaphore(thread_num)
        with open(device_file, 'r', encoding='utf-8') as f:
            data = csv.reader(f)
            next(data)
            for dev_data in data:
                try:
                    pool_sema.acquire()
                    mythread = threading.Thread(
                        target=run, args=(cmdlist, dev_data[3], dev_data[1], dev_data[4]))
                    mythread.start()
                    threads_list.append(mythread)
                except IndexError:
                    pass
        for t in threads_list:
            t.join()

        print('正在打包日志数据。。。')
        Zip_File(ZIPFILE=logRootDir + ".zip", ZipFileDir=logRootDir)
        print('打包日志数据完成!')

  • 自动去读取事件警报的数据,保存在数据库中的同时,也对已有的数据进行筛选,去除重复项后,生成IP封禁名单。

  • 读取当前目录下文件名为device_list.csv文件,用于在对应防火墙上封禁对应策略。此处只针对于山石防火墙。

    序号,厂商,设备名称,管理IP,端口
    1,test,hillstone,192.168.111.2,2222
    

2.2 打包脚本为可执行文件

  • pyinstaller.exe -F .\BanDirSearch.py

当事件触发时,即可触发脚本执行。

标签:log,FW,WAF,cmdlist,file,Splunk,path,os
From: https://www.cnblogs.com/f-carey/p/18304871

相关文章

  • WAF基础介绍
    WAF一、WAF是什么?WAF能够做什么二waf的部署三、WAF的工作原理一、WAF是什么?WAF的全称是(WebApplicationFirewall)即Web应用防火墙,简称WAF。国际上公认的一种说法是:Web应用防火墙是通过执行一系列针HTTP/HTTPS的安全策略来专门为Web应用提供保护的一款产品。部署......
  • 用Goaccess对Web及雷池WAF日志实现可视化分析
    君衍.一、项目环境介绍二、Goaccess1、Goaccess介绍2、存储方式3、配置选项4、自定义日志/日期格式5、特殊格式说明符三、雷池访问日志1、配置文件改变2、docker配置3、示例测试四、Goaccess安装1、安装依赖2、编译安装五、Goaccess对Nginx日志分析1、常用命令参数2、......
  • Waf绕过
    设备类型由上到下,waf的检测细腻度依次降低网络层WAF:先拦截流量,进行检测后再转发给应用层WAF:先经过apache/nginx解析后再交给php处理云WAF(CDN+WAF):简单的看成CDN加上软件WAF的结合体,既可以抗住DDos攻击,也可以过滤出部分简单的Payload攻击代码,甚至对流量也有一定的清洗作用......
  • ubutu 安装雷池waf
    系统环境:ubuntu20.04 宝塔管理面板,并安装Nginx由于宝塔防火墙需要收费,小网站不想购买,最后找到雷池waf社区版。雷池技术架构 第一步:我采用的是自动安装:并在同一台服务器上bash-c"$(curl-fsSLkhttps://waf-ce.chaitin.cn/release/latest/setup.sh)"第二步:我采用......
  • 银河麒麟v10(Sword)(aarch64架构)安装zhongkui-waf
    银河麒麟v10(Sword)(aarch64架构)安装zhongkui-waf系统:KylinLinuxAdvancedServerreleaseV10(Sword)waf官网地址:https://github.com/bukaleyang/zhongkui-waf需要安装的组件:OpenResty、ZhongKui、libmaxminddb和geoipupdate官方提供了install.sh脚本可以安装,但是脚本......
  • zhongkui-waf具体配置说明
    zhongkui-waf具体配置说明Zhongkui-WAF内置了管理界面,但你依然可以通过直接修改相应配置文件来进行自定义配置。Zhongkui-WAF的基本配置在/conf/zhongkui.conf文件中,你可以对它进行修改。ip黑名单列表可以配置在/conf/zhongkui.conf文件中,也可以配置在path-to-zhongkui-waf/rul......
  • WAF指纹识别
    什么是WAF定义WebApplicationFirewall---Web应用防火墙过滤HTTP或者HTTPS的请求,识别并拦截恶意的请求类型硬件型WAF云WAF软件型WAF常见WAF厂商各种云:阿里云、腾讯云、华为云、百度云……安全狗、宝塔、360、知道创宇、长亭、安恒…WAF的识别思路方法1.额......
  • 解决网站使用WAF后无法获取用户真实IP地址的问题
    请支持原文原创内容❤️:解决网站使用WAF后无法获取用户真实IP地址的问题|BOBOBlog10/06/2024在部署Web应用防火墙(WAF)以增强WordPress网站的安全性后,无法获取用户的真实IP地址,文章则详细介绍了如何在Nginx和Apache服务器上配置以解决这一问题。Nginx,如何使用set_real_ip_from......
  • 信息打点-协议应用_内网资产_CDN_WAF_负载均衡_防火墙
    服务信息获取-协议应用&内网资产常见端口默认对应的服务:特殊服务端口:端口扫描工具:旁注查询旁注查询,又称为旁站查询或同服务器网站查询,是一种信息安全和网络侦查技术,主要用于发现与目标网站托管在同一服务器上的其他网站。这种查询的目的通常与网络安全测试、情报收集......
  • 常见的WAF攻击类型
    SQL注入、跨站脚本攻击、网页木马上传、命令/代码注入、文件包含、敏感文件访问、第三方应用漏洞攻击、CC攻击、恶意爬虫扫描、跨站请求伪造等攻击,保护Web服务安全稳定。个人觉得:waf攻击的防御具有事后性,除非一些已经成熟的攻击类型和防御办法可以通过预制规则的方式建立起来。......