使用 Python 操作 Splunk
目录1 参考文档
Splunk Github
地址: GitHub - splunk/splunk-sdk-python: Splunk Software Development Kit for PythonSplunk
开发者文档地址:Python tools | Documentation | Splunk Developer Program
2 安装 Python Splunk-SDK
当出现如下提示时,需要安装Splunk-SDK
:
ModuleNotFoundError: No module named 'splunklib'
# 解决方式
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple splunk-sdk
# 若依然提示相同问题,需要再安装 splunklib
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple splunklib
3 连接 splunk
-
使用
splunklib.client
模块连接Splunk
. -
脚本如下:
import splunklib.client as client HOST = "localhost" PORT = 8089 USERNAME = "admin" PASSWORD = "yourpassword" # 创建服务实例并登录 service = client.connect( host=HOST, port=PORT, username=USERNAME, password=PASSWORD) # 在console中打印安装的apps以确认连接成功 for app in service.apps: print app.name
注 以上认证方式只在Enterprise Splunk 和Enterprise Free Splunk中有效。Free Splunk版本不带认证,需要配置如下:
HOST = "localhost"
PORT = 8089
USERNAME = "admin"
PASSWORD = ""
4 配置查询
- Splunk用4种查询方式。此处我只创建一个快速查询
One-Shot search
. 即不检查查询语句是否完整便开始搜索,最后等待响应结果。 - 脚本如下:
#!/usr/bin/python3
import splunklib.client as client
import splunklib.results as results
HOST = "127.0.0.1"
PORT = 8089
USERNAME = "admin"
PASSWORD = ""
# 创建服务实例并登录
service = client.connect(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD)
# 设置查询语句
# 查询5分钟前到此刻的数据
kwargs_oneshot = {"earliest_time": "-5m@m", "latest_time": "now"}
# 设置查询语句
searchquery_oneshot = 'search index="index_netsec" (host="192.168.1.2" "Web-security@SYS" NOT (AttackName="信息泄露" OR AttackName= "检测curl网络爬虫")) OR (host="192.168.1.3" "192.168.1.4") | stats count by src_ip'
# 运行并保存结果
# 运行并保存运行结果,以Json格式输出数据
oneshotsearch_results = service.jobs.oneshot(searchquery_oneshot, **kwargs_oneshot, output_mode='json')
# 使用JSONResultsReader方法获取数据
reader = results.JSONResultsReader(oneshotsearch_results)
# Get the results and display them using the for result in reader:
if isinstance(result, results.Message):
# 调试数据使用会话形式展示
print(f'{result.type}: {result.message}')
elif isinstance(result, dict):
# Normal events are returned as dicts
iplist.append('ip {}/32'.format(result['src_ip'].strip()))
print(iplist)