简单来说,EDA通常指的是数据集的任何初始处理。通常,这些是较小的数据集,是较大数据集的子集,但你也可以使用大数据执行EDA。在本文中,你将扮演SecOps分析师的角色,对EDR( Endpoint Detection and Response:端点检测和响应)数据的快照执行EDA,这些数据可能来自主流工具。
测试数据
对于SecOps分析师来说,这可能需要处理原始数据集,如网络日志、历史端点检测与响应(EDR)事件、身份验证或通用身份与访问管理(IAM)日志,或几乎任何其他相关的安全或IT可观察性数据点。
在本文中,你将扮演SecOps分析师的角色,对EDR( Endpoint Detection and Response:端点检测和响应)数据的快照执行EDA,这些数据可能来自主流工具。该数据丰富且上下文丰富,其中包含有关发现、文件、设备、所有者、位置、操作系统信息和其他几个数据点的信息。
示例数据请在这里下载,总体来说,这个 JSON 数据结构非常全面且详细地记录了一次与端点检测和响应相关的事件信息,从事件本身的基础情况、涉及的设备、文件、进程到各种描述、时间、风险状态等多方面进行了呈现,多条这样的记录组成的数据集可用于安全分析、威胁追踪、系统监控等众多相关场景。
要执行此分析,您将使用Python脚本和DuckDB(一种可移植的进程内分析数据库)来学习如何为EDA编写基本的SQL语句。
快速开始
要将脚本中的每个块作为notebook运行,选择run Cell选项执行单元格代码。一个交互窗口将自动出现在VSCode中。
# %%
import duckdb
LOCAL_JSON = "../data/synthetic_edr_data_with_process.json"
# %%
duckdb.sql(
f"""
select count(*) from read_json('{LOCAL_JSON}')
"""
).show()
# %%
查询结果:
┌──────────────┐
│ count_star() │
│ int64 │
├──────────────┤
│ 1000 │
└──────────────┘
如何返回数据量大,不利于我们查看分析,和所有事情一样,适度是关键。使用COUNT(*)是了解如何限制自己的一种信息丰富的方式,下面例子使用limit语句。LIMIT设置要从数据集检索的最大行数。作为一名人工分析师,使用LIMIT来减少EDA期间查看的数据量有助于防止在探索数据集时变得不堪重负。
# %%
duckdb.sql(
f"""
SELECT * FROM read_json('{LOCAL_JSON}')
LIMIT 10
"""
).show()
返回结果:
┌─────────┬───────────┬───────────────┬───┬──────────────────────┬──────────────────────┬──────────────────────┐
│ action │ action_id │ activity_name │ … │ device │ file │ process │
│ varchar │ int64 │ varchar │ │ struct(uid_alt var… │ struct(accessed_ti… │ struct(created_tim… │
├─────────┼───────────┼───────────────┼───┼──────────────────────┼──────────────────────┼──────────────────────┤
│ Denied │ 2 │ Close │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Update │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Update │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Close │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Close │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Update │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Close │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Close │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Update │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
│ Denied │ 2 │ Close │ … │ {'uid_alt': sensor… │ {'accessed_time': … │ {'created_time': 2… │
├─────────┴───────────┴───────────────┴───┴──────────────────────┴──────────────────────┴──────────────────────┤
│ 10 rows 31 columns (6 shown) │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
要强制行唯一性,请使用SELECT DISTINCT语句返回不同的(惟一的)值。在某些情况下,这有助于进一步减少返回的数据量,例如查询数据集中值得注意的唯一实例或指示符。例如,检索身份验证日志中用户的唯一名称,或者检索EDR数据集中设备的ip或uuid。
# %%
duckdb.sql(
f"""
SELECT DISTINCT * FROM read_json('{LOCAL_JSON}')
LIMIT 10
"""
).show()
在下面的查询中,您可以检索假想的EDR工具采样的每一对唯一的设备IP地址和可疑恶意软件的文件名。通过添加更多字段进行测试模拟,尽管存在差异,EDA仍然可以产生大量的结果。
duckdb.sql(
f"""
SELECT DISTINCT
device.ip,
file.name
FROM read_json('{LOCAL_JSON}')
LIMIT 10
"""
).show()
这里通过点号进行嵌套查询,返回结果:
┌─────────────────┬─────────────────────────┐
│ ip │ name │
│ varchar │ varchar │
├─────────────────┼─────────────────────────┤
│ 195.140.154.58 │ WMIGhost.zip │
│ 95.183.3.98 │ Artemis.zip │
│ 87.236.176.230 │ IllusionBot_May2007.zip │
│ 185.82.72.134 │ NYB.zip │
│ 205.210.31.48 │ Waski.zip │
│ 162.243.174.145 │ Shamoon.zip │
│ 184.105.247.250 │ Rustock.zip │
│ 143.198.122.77 │ SpyEye.zip │
│ 117.253.4.238 │ KRBanker.zip │
│ 205.210.31.212 │ Jumper.zip │
├─────────────────┴─────────────────────────┤
│ 10 rows 2 columns │
└───────────────────────────────────────────┘
可以将COUNT()等数学和统计操作符与SELECT DISTINCT语句结合使用,以提供不同值的总数。这可以进一步告知要在LIMIT语句中使用的行总数,或者用作过于简单的聚合机制。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
COUNT(device.ip) as total_device_ips,
COUNT(file.name) as total_file_names
FROM read_json('{LOCAL_JSON}')
"""
).show()
返回结果:
┌──────────────────┬──────────────────┐
│ total_device_ips │ total_file_names │
│ int64 │ int64 │
├──────────────────┼──────────────────┤
│ 1000 │ 1000 │
└──────────────────┴──────────────────┘
聚合分析
一种简单的聚合类型是将特定字段与该字段中的值在数据集中出现的次数一起分组。要启动聚合,请按名称指定字段,然后再次使用COUNT()函数和AS语句创建别名。然后,使用GROUP BY语句将具有相同值的行分组为“摘要行”。在本例中,我们统计设备ip在数据集中出现的次数的聚合。
duckdb.sql(
f"""
SELECT
COUNT(device.ip) AS total_device_ips,
device.ip AS device_ip
FROM read_json('{LOCAL_JSON}')
GROUP BY device_ip
ORDER BY total_device_ips DESC
LIMIT 15
"""
).show()
返回结果:
┌──────────────────┬────────────────┐
│ total_device_ips │ device_ip │
│ int64 │ varchar │
├──────────────────┼────────────────┤
│ 2 │ 34.38.151.7 │
│ 2 │ 157.245.155.30 │
│ 2 │ 156.146.57.122 │
│ 2 │ 185.132.187.39 │
│ 2 │ 162.243.152.4 │
│ 2 │ 47.252.20.241 │
│ 2 │ 65.49.1.118 │
│ 2 │ 120.26.144.39 │
│ 2 │ 45.33.109.8 │
│ 2 │ 198.235.24.168 │
│ 2 │ 172.233.17.65 │
│ 2 │ 178.128.84.112 │
│ 2 │ 118.31.13.222 │
│ 2 │ 64.227.10.228 │
│ 2 │ 87.236.176.111 │
├──────────────────┴────────────────┤
│ 15 rows 2 columns │
└───────────────────────────────────┘
查询EDR数据集中的特定设备IP。您可以任意重写谓词以搜索其他值。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
*
FROM read_json('{LOCAL_JSON}')
WHERE device.ip = '188.166.30.169'
"""
).show()
在下面的示例中,EDR数据中需要几个字段:主机名、用户名、SHA-256哈希值、文件路径和文件名。这些字段仅在标准化严重性为致命或关键并且EDR发现正在积极工作(“正在进行中”)时返回。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
device.hostname AS hostname,
device.owner.name AS username,
file.sha256_hASh_data.value AS sha256,
file.path AS file_path,
file.name AS filename
FROM read_json('{LOCAL_JSON}')
WHERE severity = 'Fatal'
OR severity = 'Critical'
AND status = 'In Progress'
"""
).show()
如果要执行不太具体的审计以检索数据集中特定事件的总数,则可以将此过滤与更多函数(如COUNT())结合使用。例如,查找正在进行的致命或严重程度的发现的总数。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
COUNT(*) AS total_matching_findings
FROM read_json('{LOCAL_JSON}')
WHERE severity = 'Fatal'
OR severity = 'Critical'
AND status = 'In Progress'
"""
).show()
也可以根据字段分组统计:
# %%
duckdb.sql(
f"""
SELECT DISTINCT
severity,
status,
COUNT(*) AS alert_COUNT
FROM read_json('{LOCAL_JSON}')
GROUP BY severity, severity_id, status
ORDER BY alert_COUNT DESC
"""
).show()
对统计结果进行过滤:
# %%
duckdb.sql(
f"""
SELECT
device.hostname AS host,
device.type AS device_type,
COUNT(*) AS alert_count
FROM read_json('{LOCAL_JSON}')
WHERE status != 'Suppressed' GROUP BY device.hostname, device.type
HAVING alert_count > 3
"""
).show()
在本节中,展示如何使用GROUP BY语句和ORDER BY子句进一步聚合和排序EDA结果。此外,还学习了使用WHERE和HAVING子句来过滤数据集,分别用于基本过滤和特定于聚合的过滤。最后通过使用布尔OR和and操作符在谓词中组合多个过滤器。在下一节中,将学习如何在SQL中使用时间戳,以及如何使用基本模式匹配扩展过滤。
时间戳和模式匹配
数据集中有至少一个有效的TIMESTAMP SQL数据类型。接下来,使用DATE_TRUNC()函数指定要在其上创建截断的间隔和时间戳字段。在本例中,你将在未被阻止的每小时警报上创建聚合,并根据每小时警报数量进行排序。对于更小的时间截断聚合,你也可以将小时替换为日。
# %%
duckdb.sql(
f"""
SELECT
COUNT(*) AS alert_count,
DATE_TRUNC('hour', time) AS event_hour
FROM read_json('{LOCAL_JSON}')
WHERE status != 'Suppressed'
GROUP BY event_hour
ORDER BY alert_count DESC
"""
).show()
执行基于时间的分析的另一种方法是使用EXTRACT()函数从时间戳中提取特定的间隔组件,例如特定的小时、日、月和/或年。您将获得提取的整数,而不是使用截断的时间戳作为别名字段值。这可能有助于根据用例提高可读性和/或实用性。
在下面的示例中,使用EXTRACT()对日、月和年时间间隔进行聚合,而不是前面执行的小时聚合示例。
# %%
duckdb.sql(
f"""
SELECT
EXTRACT(day FROM time) AS event_day,
EXTRACT(month FROM time) AS event_month,
EXTRACT(year FROM time) AS event_year,
COUNT(*) AS alert_count
FROM read_json('{LOCAL_JSON}')
WHERE status != 'Suppressed'
GROUP BY event_day, event_month, event_year
ORDER BY event_day DESC
"""
).show()
LIKE操作符本身的行为类似于不使用特殊符号的等号操作符。下划线(_)匹配任何单个字符,百分号(%)匹配任何0个或多个字符的序列。例如,请参考以下来自DuckDB文档的SQL示例,这些示例展示了LIKE操作符的功能。真正的注释表示匹配。
SELECT 'abc' LIKE 'abc'; -- true
SELECT 'abc' LIKE 'a%' ; -- true
SELECT 'abc' LIKE '_b_'; -- true
SELECT 'abc' LIKE 'c'; -- false
SELECT 'abc' LIKE 'c%' ; -- false
SELECT 'abc' LIKE '%c'; -- true
SELECT 'abc' NOT LIKE '%c'; -- false
在下面的示例中,检索EDR数据集中的资产和潜在恶意软件文件名,其中ssh位于文件路径中。这可以表示通过远程SSH会话下载的恶意软件,或者SSH会话被打开到前向阶段的命令与控制(C2)节点以检索恶意软件。使用LIKE和模式匹配可以证明有助于寻找特定的交易技术,并可用于向聚合中添加进一步的规范。
# %%
duckdb.sql(
f"""
SELECT
device.hostname AS hostname,
device.ip AS device_ip,
device.agent.uid AS agent_id,
file.name AS filename
FROM read_json('{LOCAL_JSON}')
WHERE file.path LIKE '%ssh%'
ORDER BY hostname ASC
"""
).show()
下面的示例使用否定(NOT LIKE)和不区分大小写的模式匹配(ILIKE)来进一步过滤EDR数据集。在这种情况下,任何以“C”开头的路径(例如在Windows操作系统中)都被排除在外,但是任何提到“shadow”的MITRE技术名称都包含在搜索中。这不是最好的实际演示,你能找到一个更好的方法来使用否定或大小写不敏感吗?
# %%
duckdb.sql(
f"""
SELECT
device.hostname AS hostname,
device.ip AS device_ip,
device.agent.uid AS agent_id,
file.name AS filename
FROM read_json('{LOCAL_JSON}')
WHERE file.path NOT LIKE '%C%'
AND finding_info.attack_technique.name ILIKE '%shadow%'
ORDER BY hostname ASC
"""
).show()
条件逻辑,统计和趋势分析
使用CASE表达式实现这种条件逻辑,其操作类似于if-then-else语句,例如Python中的if、elif和else条件循环。有关特定于DuckDB的实现的更多详细信息,请参阅DuckDB文档中的CASE Statement部分。
在下面的示例中,根据设备上安装的CentOS Linux发行版的特定版本使用CASE表达式创建一个概念上的impact_level字段。每当在CASE表达式中引用字段时,它们也必须包含在GROUP BY语句中。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
device.hostname AS hostname,
device.ip AS device_ip,
CASE
WHEN device.os.name = 'CentOS' AND device.os.version = '8' THEN 'High Impact'
WHEN device.os.name = 'CentOS' AND device.os.version = '8.2' THEN 'Medium Impact'
WHEN device.os.name = 'CentOS' AND device.os.version = '9' THEN 'Low Impact'
ELSE 'No Impact'
END AS impact_level
FROM read_json('{LOCAL_JSON}')
WHERE device.os.name = 'CentOS'
GROUP BY device.hostname, device.ip, device.os.name, device.os.version
ORDER BY hostname ASC
"""
).show()
要避免在聚合中出现空值,或者为空值提供占位符值(类似于前面的CASE表达式示例),请考虑使用COALESCE()函数。COALESCE()函数可以接受任意数量的参数,比如字段名,并返回它找到的第一个非空值。类似于CASE表达式的ELSE子句,如果所有其他参数都返回null,您可以提供一个“回退”值。
在下面的示例中,COALESCE()函数的使用与上面的示例类似,当设备中的所有标识符都存在时填充一个值。owner JSON对象为空。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
device.hostname AS hostname,
COALESCE(device.owner.uid, device.owner.email_addr, device.owner.domain, 'Unknown') AS device_owner
FROM read_json('{LOCAL_JSON}')
ORDER BY hostname ASC
"""
).show()
在EDA期间使用SQL中的统计函数可以用于评估登录频率、从给定源连接到特定主机的时间量,或者用作网络、文件系统、进程或身份日志源中简单异常检测的基础。您可以将其中的几个组合在一起,以创建日志中某些字段计数的平均值,以生成要使用的整数。
在下面的示例中,在各种统计函数中使用了风险和严重性的归一化值。这不是最好的用例,但是MIN()和MAX()函数可以用来确定字段值的极限上限和外部边界。
# %%
duckdb.sql(
f"""
SELECT
AVG(risk_level_id) AS avg_risk,
AVG(severity_id) AS avg_sev,
MAX(risk_level_id) AS max_risk,
MAX(severity_id) AS max_sev,
MIN(risk_level_id) AS min_risk,
MIN(severity_id) AS min_sev
FROM read_json('{LOCAL_JSON}')
"""
).show()
为了结束介绍性EDA教程,最后要简要介绍的函数是窗口函数,它支持跨特定的数据行执行计算,同时保持其他行的可见性。换句话说,窗口函数允许你“放大”每个事件,同时查看围绕它们的所有其他事件的大局或主题,因为它们不像groupby子句那样使用“汇总行”。
例如,可以使用窗口函数来显示失败的登录尝试如何与特定用户或IP地址的尝试相匹配。它对于识别潜在的异常或至少在EDA期间发现趋势非常有用。有关DuckDB中窗口函数的更多信息和一般示例,请参阅DuckDB文档中的窗口函数部分。
一种类型的窗口函数是ROW_NUMBER()函数,它为结果集中的每一行分配唯一的顺序值,从1开始。这对于创建行排序是有益的,并且可以与另一个SQL子句PARTITION BY配合使用。分区是一组行,允许您查看它们并在分区的子集上执行计算操作。
要使用ROW_NUMBER(),您需要将它与OVER()子句配合使用,该子句定义了窗口函数的实际窗口。使用PARTITION BY是可选的,但是你应该使用ORDER BY对窗口函数进行排序。
在下面的SQL查询中,将从EDR数据集请求几个值得注意的字段,并基于按事件时间排序的分区设置文件名上的窗口函数。与其他SQL函数一样,输出可以使用AS关键字别名,并在另一个ORDER BY子句中进一步使用。
# %%
duckdb.sql(
f"""
SELECT DISTINCT
device.hostname AS hostname,
finding_info.title as finding_title,
finding_info.attack_technique.uid as attack_technique_id,
file.name as filename,
ROW_NUMBER() OVER (
PARTITION BY
file.name
ORDER BY time
) AS malware_count
FROM read_json('{LOCAL_JSON}')
ORDER BY malware_count DESC limit 8
"""
).show()
返回结果:
┌──────────────────────┬────────────────────────────┬─────────────────────┬────────────────────────────┬───────────────┐
│ hostname │ finding_title │ attack_technique_id │ filename │ malware_count │
│ varchar │ varchar │ varchar │ varchar │ int64 │
├──────────────────────┼────────────────────────────┼─────────────────────┼────────────────────────────┼───────────────┤
│ LAX9-098 │ Potential critical file … │ T1004 │ CryptoLocker_10Sep2013.zip │ 19 │
│ rhel-db-prod-167 │ Potential critical file … │ T1025 │ AndroRat_6Dec2013.zip │ 19 │
│ debian-app-dev-194 │ Potential informational … │ T1061 │ Telefonica.zip │ 18 │
│ centos-db-dev-017 │ Potential critical file … │ T1153 │ CryptoLocker_10Sep2013.zip │ 18 │
│ CHI5-247 │ Potential critical file … │ T1649 │ AndroRat_6Dec2013.zip │ 18 │
│ LAX9-003 │ Potential critical file … │ T1057 │ Dino.zip │ 17 │
│ ubuntu-store-test-… │ Potential informational … │ T1039 │ Telefonica.zip │ 17 │
│ LAP-162 │ Potential critical file … │ T1198 │ CryptoLocker_10Sep2013.zip │ 17 │
└──────────────────────┴────────────────────────────┴─────────────────────┴────────────────────────────┴───────────────┘
在本节中,您学习了如何使用CASE表达式和COALESCE()函数实现条件if/else逻辑,同时还使用后者处理空值。您了解了如何使用其他基本统计函数,如AVG()、MIN()和MAX(),以及它们如何与COUNT()函数配合使用。最后,学习了如何使用ROW_NUMBER()窗口函数和PARTITION BY子句在实际的数据行上创建聚合,而不仅仅是汇总行。
总结
在本博客中,介绍了使用SQL执行探索性数据分析。让你了解如何使用DuckDB(一种内存中的向量化分析数据库)对存储在JSON文件中的半结构化数据执行SQL语句。最后,学习了如何实现十几种不同的SQL函数、语句、操作和子句,以处理各种数据点和数据类型。
标签:duckdb,探索性,JSON,DuckDB,file,time,device,SELECT From: https://blog.csdn.net/neweastsun/article/details/144592773