key = "code execution"
if key not in _local_cache:
_local_cache[key] = next(_[1] for _ in SUSPICIOUS_HTTP_REQUEST_REGEXES if "code execution" in _[0])
if re.search(_local_cache[key], unquoted_path, re.I) is None: # NOTE: to prevent malware domain FPs in case of outside scanners
url = path.split("://", 1)[1]
if '/' not in url:
url = "%s/" % url
host, path = url.split('/', 1)
if host.endswith(":80"):
host = host[:-3]
path = "/%s" % path
proxy_domain = host.split(':')[0]
_check_domain(proxy_domain, sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, packet)
elif method == "CONNECT":
if '/' in path:
host, path = path.split('/', 1)
path = "/%s" % path
else:
host, path = path, '/'
if host.endswith(":80"):
host = host[:-3]
url = "%s%s" % (host, path)
proxy_domain = host.split(':')[0]
_check_domain(proxy_domain, sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, packet)
if url is None:
url = "%s%s" % (host, path)
if config.USE_HEURISTICS:
user_agent, result = None, None
first_index = tcp_data.find("\r\nUser-Agent:")
if first_index >= 0:
first_index = first_index + len("\r\nUser-Agent:")
last_index = tcp_data.find("\r\n", first_index)
if last_index >= 0:
user_agent = tcp_data[first_index:last_index]
user_agent = _urllib.parse.unquote(user_agent).strip()
if user_agent:
result = _result_cache.get((CACHE_TYPE.USER_AGENT, user_agent))
if result is None:
if re.search(WHITELIST_UA_REGEX, user_agent, re.I) is None:
match = re.search(SUSPICIOUS_UA_REGEX, user_agent)
if match and match.group(0):
def _(value):
return value.rstrip('\\').replace('(', "\\(").replace(')', "\\)")
parts = user_agent.split(match.group(0), 1)
if len(parts) > 1 and parts[0] and parts[-1]:
result = _result_cache[(CACHE_TYPE.USER_AGENT, user_agent)] = "%s (%s)" % (_(match.group(0)), _(user_agent))
else:
result = _result_cache[(CACHE_TYPE.USER_AGENT, user_agent)] = _(match.group(0)).join(("(%s)" if part else "%s") % _(part) for part in parts)
if not result:
_result_cache[(CACHE_TYPE.USER_AGENT, user_agent)] = False
if result:
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.UA, result, "user agent (suspicious)", "(heuristic)"), packet)
if not _check_domain_whitelisted(host):
path = path.replace("//", '/')
unquoted_path = _urllib.parse.unquote(path)
unquoted_post_data = _urllib.parse.unquote(post_data or "")
checks = [path.rstrip('/')]
if '?' in path:
checks.append(path.split('?')[0].rstrip('/'))
if '=' in path:
checks.append(path[:path.index('=') + 1])
_ = re.sub(r"(\w+=)[^&=]+", r"\g<1>", path)
if _ not in checks:
checks.append(_)
if _.count('/') > 1:
checks.append("/%s" % _.split('/')[-1])
elif post_data:
checks.append("%s?%s" % (path, unquoted_post_data.lower()))
if checks[-1].count('/') > 1:
checks.append(checks[-1][:checks[-1].rfind('/')])
checks.append(checks[0][checks[0].rfind('/'):].split('?')[0])
for check in filter(None, checks):
for _ in ("", host):
check = "%s%s" % (_, check)
if check in trails:
if '?' not in path and '?' in check and post_data:
trail = "%s(%s \\(%s %s\\))" % (host, path, method, post_data.strip())
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.HTTP, trail, trails[check][0], trails[check][1]))
else:
parts = url.split(check)
other = ("(%s)" % _ if _ else _ for _ in parts)
trail = check.join(other)
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.URL, trail, trails[check][0], trails[check][1]))
return
if "%s/" % host in trails:
trail = "%s/" % host
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.URL, trail, trails[trail][0], trails[trail][1]))
return
if config.USE_HEURISTICS:
match = re.search(r"\b(CF-Connecting-IP|True-Client-IP|X-Forwarded-For):\s*([0-9.]+)".encode(), packet, re.I)
if match:
src_ip = "%s,%s" % (src_ip, match.group(1))
for char in SUSPICIOUS_HTTP_REQUEST_FORCE_ENCODE_CHARS:
replacement = SUSPICIOUS_HTTP_REQUEST_FORCE_ENCODE_CHARS[char]
path = path.replace(char, replacement)
if post_data:
post_data = post_data.replace(char, replacement)
if not any(_ in unquoted_path.lower() for _ in WHITELIST_HTTP_REQUEST_PATHS):
if any(_ in unquoted_path for _ in SUSPICIOUS_HTTP_REQUEST_PRE_CONDITION):
found = _result_cache.get((CACHE_TYPE.PATH, unquoted_path))
if found is None:
for desc, regex in SUSPICIOUS_HTTP_REQUEST_REGEXES:
if re.search(regex, unquoted_path, re.I | re.DOTALL):
found = desc
break
_result_cache[(CACHE_TYPE.PATH, unquoted_path)] = found or ""
if found and not ("data leakage" in found and is_local(dst_ip)):
trail = "%s(%s)" % (host, path)
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.URL, trail, "%s (suspicious)" % found, "(heuristic)"), packet)
return
if any(_ in unquoted_post_data for _ in SUSPICIOUS_HTTP_REQUEST_PRE_CONDITION):
found = _result_cache.get((CACHE_TYPE.POST_DATA, unquoted_post_data))
if found is None:
for desc, regex in SUSPICIOUS_HTTP_REQUEST_REGEXES:
if re.search(regex, unquoted_post_data, re.I | re.DOTALL):
found = desc
break
_result_cache[(CACHE_TYPE.POST_DATA, unquoted_post_data)] = found or ""
if found:
trail = "%s(%s \\(%s %s\\))" % (host, path, method, post_data.strip())
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.HTTP, trail, "%s (suspicious)" % found, "(heuristic)"), packet)
return
if '.' in path:
_ = _urllib.parse.urlparse("http://%s" % url) # dummy scheme
path = path.lower()
filename = _.path.split('/')[-1]
name, extension = os.path.splitext(filename)
trail = "%s(%s)" % (host, path)
if extension in SUSPICIOUS_DIRECT_DOWNLOAD_EXTENSIONS and not is_local(dst_ip) and not any(_ in path for _ in WHITELIST_DIRECT_DOWNLOAD_KEYWORDS) and '=' not in _.query and len(name) < 10:
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.URL, trail, "direct %s download (suspicious)" % extension, "(heuristic)"), packet)
else:
for desc, regex in SUSPICIOUS_HTTP_PATH_REGEXES:
if re.search(regex, filename, re.I):
log_event((sec, usec, src_ip, src_port, dst_ip, dst_port, PROTO.TCP, TRAIL.URL, trail, "%s (suspicious)" % desc, "(heuristic)"), packet)
break
各行代码功能及函数总体功能
在函数 `_process_packet` 中,这段代码的功能是解析HTTP请求,并进行一系列的分析和日志记录。这段代码是函数的一部分,用于处理捕获到的单个原始IP层数据包。
代码中的 `try` 块尝试执行以下操作:
1. **解析HTTP请求**:
- `if tcp_data.startswith("HTTP/"):`: 如果TCP数据开始于 "HTTP/",则解析HTTP请求。
- `if " HTTP/" in tcp_data:`: 如果TCP数据中包含 " HTTP/",则解析HTTP请求。
- `if method and path:`: 如果解析出HTTP方法(如GET或POST)和路径,则继续处理。
2. **解析URL和POST数据**:
- `url = path.split("://", 1)[1]`: 解析URL。
- `host, path = url.split('/', 1)`: 解析主机名和路径。
- `if host.endswith(":80"):`: 如果主机名以 ":80" 结尾,则去除该部分。
- `if "://" in path:`: 如果路径中包含 "://",则解析URL和POST数据。
3. **检查域名是否在轨迹中**:
- `if _check_domain_whitelisted(host):`: 如果域名在白名单中,则继续处理。
4. **检查URL和POST数据**:
- `checks = [path.rstrip('/')]`: 创建一个检查列表,包括路径和可能的查询字符串。
- `for check in filter(None, checks):`: 遍历检查列表,检查每个元素是否在轨迹中。
- `if check in trails:`: 如果检查的元素在轨迹中,则记录事件。
5. **检查文件扩展名**:
- `filename = _.path.split('/')[-1]`: 解析文件名。
- `name, extension = os.path.splitext(filename)`: 解析文件名和扩展名。
- `trail = "%s(%s)" % (host, path)`: 创建一个轨迹描述。
- `if extension in SUSPICIOUS_DIRECT_DOWNLOAD_EXTENSIONS and not is_local(dst_ip) and not any(_ in path for _ in WHITELIST_DIRECT_DOWNLOAD_KEYWORDS) and '=' not in _.query and len(name) < 10:`: 如果文件扩展名在可疑直接下载扩展名列表中,且域名不在白名单中,则记录事件。
6. **检查路径**:
- `for desc, regex in SUSPICIOUS_HTTP_PATH_REGEXES:`: 遍历可疑HTTP路径正则表达式列表。
- `if re.search(regex, filename, re.I):`: 如果文件名与正则表达式匹配,则记录事件。
函数的总体功能是解析HTTP请求,并执行一系列的分析和日志记录。这包括解析URL和POST数据、检查域名是否在轨迹中、检查文件扩展名和路径,以及记录可疑活动。如果检测到可疑活动,函数会记录事件,并将相关信息存储在轨迹信息中。