首页 > 其他分享 >批量查询威胁情报

批量查询威胁情报

时间:2024-07-25 13:53:30浏览次数:22  
标签:批量 format 情报 ip 查询 result file ti path

批量查询威胁情报

目录

1 编写目的

在提升蓝队进行对 IP 研判的效率工作时,通常有以下场景:

  1. 需要对多个攻击 IP 进行查询研判;
  2. 为了提高得到 IP 信息的准确程度,通常会到威胁情报共享平台查询关于IP的威胁情报。

2 实现目标

  1. 将整理成文档的可疑 IP 地址列表进行批量查询;
  2. 输出微步与奇安信TI情报库的信息对 IP 进行研判。

3 脚本实例

  1. 在脚本的目录下创建iplist.txt文档,并写入可疑 IP ,每个 IP 为一行;
  2. 执行脚本后会自动生成 ipinfo.xlsx 文档
import os
import sys
import time

from openpyxl import Workbook
import chardet
import requests


def check_code(f_path):
    """
    检查文件所属编码
    :param f_path:文件名称
    :return:文件所属编码
    """
    file_text = open(f_path, 'rb').readline()
    file_code = chardet.detect(file_text)['encoding']
    # 由于windows系统的编码有可能是Windows-1254,打印出来后还是乱码,所以不直接用adchar['encoding']编码
    if file_code == 'gbk' or file_code == 'GBK' or file_code == 'GB2312':
        file_code = 'GB2312'
    else:
        file_code = 'utf-8'
    return file_code


def existsfile(filename):
    if not os.path.exists(filename):  # 判断目标文件是否存在
        print('%s 不存在,请创建!' % filename)
        sys.exit(2)
    else:
        return filename


def save_result(x_result_parse: dict, f_path):
    # 将查询结果记录在csv中
    wb = Workbook()
    # 获取默认的工作表
    ws = wb.active
    ws.title = "ipinfo"
    i = 2
    ws["A1"].value = "序号"
    ws["B1"].value = "攻击IP"
    ws["C1"].value = "IP区域"
    ws["D1"].value = "微步情报"
    ws["E1"].value = "微步情报时间"
    ws["F1"].value = "奇安信情报"
    ws["G1"].value = "奇安信情报时间"
    for ip, ip_info in x_result_parse.items():
        ws["A{}".format(i)].value = str(i - 1)
        ws["B{}".format(i)].value = ip
        ws["C{}".format(i)].value = ip_info['location']
        ws["D{}".format(i)].value = ip_info['x_judgments']
        ws["E{}".format(i)].value = ip_info['x_update_time']
        ws["F{}".format(i)].value = ip_info['ti_judgments']
        ws["G{}".format(i)].value = ip_info['ti_update_time']
        i += 1
    wb.save(filename=f_path)
    print("报告收集完成,输出位置: {}".format(f_path))


def get_result(f_path):
    ip_info_dict = {}
    ip_file_code = check_code(f_path)
    with open(f_path, 'r', encoding=ip_file_code) as ip_file_content:
        iplist = [ip.strip() for ip in ip_file_content.readlines()]
    if len(iplist) > 60:
        print("超出微步数量限制,请缩小数量在60以下")
        sys.exit(1)
    """
        asn:asn信息。
        ports:端口信息。
        cas:SSL证书等信息。
        rdns_list:rdns记录信息。
        intelligences:威胁情报。
        judgments:从威胁情报中分析,综合判定的威胁类型。
        tags_classes:相关攻击团伙或安全事件信息标签等。
        samples:相关样本。
        update_time:情报最近更新时间。
        sum_cur_domains:当前指向域名的数量。
        scene:应用场景。
    """

    for query_ip in iplist:
        print("正在分析{} 中...".format(query_ip))
        x_url = "https://api.threatbook.cn/v3/ip/query"
        x_query = {
            "apikey": "threatbook_Api_key",
            "resource": "{}".format(query_ip),
            "exclude": "ports,cas,rdns_list,samples,sum_cur_domains,scene",
            "lang": "zh"
        }
        x_response = requests.request("GET", url=x_url, params=x_query)
        x_result = x_response.json()

        # Ti
        ti_url = "https://webapi.ti.qianxin.com/ip/v3/reputation?param=" + query_ip
        ti_headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/76.0.3809.132 Safari/537.36",
            'Api-Key': 'ti_Api_key',
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
        }
        ti_response = requests.get(ti_url, headers=ti_headers)
        ti_result = ti_response.json()
        if x_response.status_code == 200 and ti_response.status_code == 200:
            # X情报社区奖励计划来源情报
            # 从威胁情报中分析,综合判定的威胁类型
            x_judgments = ",".join(x_result['data']['{}'.format(query_ip)]['judgments'])
            x_update_time = x_result['data']['{}'.format(query_ip)]['update_time']

            # 奇安信情报
            ti_geo_country = ti_result['data']['{}'.format(query_ip)]['geo']['country']
            ti_geo_city = ti_result['data']['{}'.format(query_ip)]['geo']['city']
            ti_geo = ti_geo_country + '-' + ti_geo_city
            ti_update_time = ti_result['data']['{}'.format(query_ip)]['summary_info']['latest_reputation_time']
            ti_judgments_reputation = ti_result['data']['{}'.format(query_ip)]['summary_info']['reputation']
            ti_judgments_malicious_label = ",".join(
                ti_result['data']['{}'.format(query_ip)]['summary_info']['malicious_label'])
            ti_judgments = ti_judgments_reputation + ":" + ti_judgments_malicious_label
            # 调用写excel函数将查询结果写到excel表格中
            ip_info_dict[query_ip] = {
                "location": ti_geo,
                "x_judgments": x_judgments,
                "x_update_time": x_update_time,
                "ti_judgments": ti_judgments,
                "ti_update_time": ti_update_time
            }
        else:
            print("x_result: ", x_result)
            print("ti_result: ", ti_result)
    return ip_info_dict


if __name__ == '__main__':
    script_path = os.path.dirname(__file__)
    # script_path = os.path.dirname(os.path.realpath(sys.executable))

    # 生成 iplist.txt 路径
    ip_file = os.path.join(script_path, 'iplist.txt')
    # 校验 iplist.txt 文件是否存在
    existsfile(ip_file)

    # 生成时间
    date_time = time.strftime('%Y%m%d%H%M%S', time.localtime())
    # 生成带有时间的 ipinfo.xlsx 文件
    info_file = os.path.join(script_path, 'ipinfo-{}.xlsx'.format(date_time))

    # 获取查询结果,需要是字典类型
    ip_result = get_result(ip_file)

    # 保存结果
    save_result(ip_result, info_file)

标签:批量,format,情报,ip,查询,result,file,ti,path
From: https://www.cnblogs.com/f-carey/p/18322843

相关文章

  • 批量打断相交线——ArcGIS 解决方法
    在数据处理,特别是地理空间数据处理或是任何涉及图形和线条分析的场景中,有时候需要把相交的线全部从交点打断一个常见的需求。这个过程对于后续的分析、编辑、或是可视化展现都至关重要,因为它可以确保每条线都是独立的,避免了因为线条重叠或相交而可能产生的错误或混淆。打断前......
  • 使用mybatis-plus拦截器MybatisPlusInterceptor进行分页查询案例
    在MyBatis-Plus中,分页功能通常是通过配置MybatisPlusInterceptor(或其前身PaginationInterceptor)来实现的,这是一个全局的拦截器,用于拦截MyBatis的SQL执行,并在其中添加分页逻辑。以下是一个使用MybatisPlusInterceptor进行分页查询的案例:添加依赖<dependencies><de......
  • SQL Server查询所有表格以及字段
    查询所有表格:selectconvert(varchar(64),s.id)asfRowId,s.nameasTableName,IsNull(cast(xp.[value]asnvarchar(4000)),s.name)asTableDesc,ModuleCode=CONVERT(varchar(16),casewhens.namelike't%'thenSUBSTRING(s.name,2,3)......
  • Pytorch Dataloader 添加批量维度
    我认为这个问题已经被问过几次了,但我还没有在这里找到一个好的答案。所以我有一个由2个numpy数组组成的Pytorch数据集。以下是维度。特征=[10000,450,28]numpy数组。dim_0=样本数,dim_1=时间序列,dim_2=特征。基本上我有一个450帧长的数据,其中每......
  • 百万级以上的数据查询的建议写法(转)
    处理百万级以上的数据提高查询速度的方法: 1.应尽量避免在where子句中使用!=或<>操作符,否则将引擎放弃使用索引而进行全表扫描。 2.对查询进行优化,应尽量避免全表扫描,首先应考虑在where及orderby涉及的列上建立索引。 3.应尽量避免在where子句中对字段进行null值......
  • DICOM格式转NII格式——SPM12批量码
     运行说明:subjectsdir换成自己的文件夹地址,有多少个受试者的DICOM就有多少个subjects子元素。 %-----------------------------------------------------------------------%Jobsavedon02-Sep-201918:21:16bycfg_util(rev$Rev:6942$)%spmSPM-SPM12(7219......
  • mp分页+批量 查询
    @OverridepublicPageDTOpageCoupons(UserCouponQueryquery){LonguserId=UserContext.getUser();userId=2L;LambdaQueryWrapper<UserCoupon>queryWrapper=newLambdaQueryWrapper<UserCoupon>().eq(UserCoupon::getUserId,userId);......
  • opengauss第十三步: 在主集群主机和备集群首备执行查询,可观察到流复制信息
    第十三步:在主集群主机和备集群首备执行查询,可观察到流复制信息主集群主节点0[omm@node1dn]$gs_ctlquery-D/opt/huawei/install/data/dn[2023-04-1809:38:34.397][1498175][][gs_ctl]:gs_ctlquery,datadiris/opt/huawei/install/data/dnHAstate:local......
  • 什么是回表查询,如何避免?
    回表查询(或称为回表操作)是指在数据库查询中,当一个索引不能包含查询所需的所有列时,数据库需要先通过索引查找到相关的记录位置(主键或行号),然后再回到表中读取完整的行数据。这种操作通常会影响查询性能,特别是在数据量较大的情况下。如何避免回表查询覆盖索引:创建包含所有查询列......
  • 在WPS的表格 里使用VBA,批量进行替换内容
    需求:在今日的工作过程中,发现有大量的内容需要从另外一个表格前两列里匹配进行替换。从编号替换成具体内容,但是有一些地方编号有多个用逗号连接在一起,需要先分隔开来。解决方案:找了很多方案,但是没有现成的公式能满足这个需求,简单的脚本无法满足,于是找了宏脚本,先录制一个简单的替换......