如何进一步做好信息收集
前言
前面一节介绍了一些信息收集的网站和工具,今天主要介绍一下如何进行半自动化的信息收集,全自动化的信息收集容易出现一些脏数据,而完全手工进行信息收集速率又太低,所以为了提高速率,我们需要充分利用一些脚本和工具
WHOIS半自动化收集
通过WHOIS网站https://reverse-whois.whoisxmlapi.com/api/documentation/making-requests来编写脚本
复制请求包后放到bp中,修改请求方法,并改为json格式
接着复制数据到数据包中
运行该脚本即可返回查询到的信息
但显然这种程度还不够,我们需要提取域名到新的文本文件中
这样进行下一步测试时就会方便很多,那还能不能进一步优化呢,目前的代码来看,每次进行whois查询时,都要手动在脚本中进行组织名的更换,这样很不方便,于是便可以将组织名写一如一个新的文件中,每次从文件中提取组织名就方便很多,这边直接给出最后的脚本
import requests
import json
# 定义函数从 orgs.txt 文件中读取内容
def load_organizations(file_path):
try:
with open(file_path, "r") as file:
# 逐行读取文件内容,并去除空行和首尾空格
organizations = [line.strip() for line in file if line.strip()]
return organizations
except FileNotFoundError:
print(f"错误:文件 {file_path} 未找到。")
return []
# 定义请求的URL和头部
url = "https://reverse-whois.whoisxmlapi.com/api/v2"
headers = {
"Accept-Encoding": "gzip, deflate, br",
"Accept": "*/*",
"Accept-Language": "en-US;q=0.9,en;q=0.8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.71 Safari/537.36",
"Cache-Control": "max-age=0",
"Content-Type": "application/json",
}
# 从 orgs.txt 文件加载组织名称
orgs_file_path = "orgs.txt"
organizations = load_organizations(orgs_file_path)
if not organizations:
print("没有组织名称可供查询,脚本退出。")
else:
# 定义请求数据
data = {
"apiKey": "at_JqV1QomuZ0wKKG8tVc9VBkr2g0Kdp",
"searchType": "current",
"mode": "purchase",
"punycode": True,
"basicSearchTerms": {
"include": organizations,
"exclude": []
}
}
try:
# 发送POST请求
response = requests.post(url, headers=headers, data=json.dumps(data))
# 检查响应状态
if response.status_code == 200:
print("请求成功!")
# 解析JSON响应
result = response.json()
# 提取 domainsList
domains_list = result.get("domainsList", [])
if domains_list:
# 将 domainsList 写入到 whois.txt 文件
with open("whois.txt", "w") as file:
for domain in domains_list:
file.write(domain + "\n")
print(f"已将 {len(domains_list)} 个域名写入到 whois.txt 文件中!")
else:
print("响应中没有找到 domainsList 或其为空。")
else:
print(f"请求失败,状态码: {response.status_code}")
print(response.text)
except requests.RequestException as e:
print(f"请求时发生错误: {e}")
通过证书进行半自动化信息收集
这是我们正常访问网站进行查询的样子,我们需要的就是common name字段
这样子提取common_name的值到cert.txt文件中去,但这同样也有一些问题:
1.并不是所有的common_name的值都是我们需要的域名,其中有一些是组织名和重复的,所以我们需要对结果进行进一步的提取和去重。
2.实际查询过程中会出现超时链接的情况,所以我们可以设置请求的超时时间,并进行重复请求几次
最后的脚本如下,结果会提取到cert.txt中
import requests
import tldextract
# 定义请求的 URL 和参数
url = "https://crt.sh/"
params = {
"q": "TESLA, INC.",
"output": "json"
}
headers = {
"Accept-Encoding": "gzip, deflate, br",
"Accept": "*/*",
"Accept-Language": "en-US;q=0.9,en;q=0.8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.71 Safari/537.36",
"Cache-Control": "max-age=0"
}
def extract_root_domain(domain):
"""提取域名的根域名(例如,从 'www.example.com' 提取 'example.com')"""
extracted = tldextract.extract(domain)
if extracted.domain and extracted.suffix:
return f"{extracted.domain}.{extracted.suffix}"
return None
def fetch_data_with_retries(url, headers, params, max_retries=3, timeout=120):
"""尝试请求数据,状态码不为200时重试,最多重试 max_retries 次"""
attempt = 0
while attempt < max_retries:
try:
response = requests.get(url, headers=headers, params=params, timeout=timeout)
if response.status_code == 200:
return response
else:
print(f"请求失败,状态码: {response.status_code}。尝试重新请求 ({attempt + 1}/{max_retries})...")
except requests.RequestException as e:
print(f"请求时发生错误: {e}。尝试重新请求 ({attempt + 1}/{max_retries})...")
attempt += 1
return None
try:
# 尝试请求数据
response = fetch_data_with_retries(url, headers, params)
if response and response.status_code == 200:
print("请求成功!")
try:
# 尝试解析 JSON 响应
data = response.json()
# 提取 common_name 中的域名并提取根域名
domains = set() # 使用集合去重
for entry in data:
common_name = entry.get("common_name", "")
root_domain = extract_root_domain(common_name)
if root_domain: # 如果提取成功
domains.add(root_domain)
# 写入去重后的根域名到文件
if domains:
with open("cert.txt", "w") as file:
for domain in sorted(domains): # 按字母顺序写入
file.write(domain + "\n")
print(f"已将 {len(domains)} 个根域名写入到 cert.txt 文件中!")
else:
print("没有找到有效的域名。")
except ValueError:
print("响应不是有效的 JSON 格式。")
print(response.text)
else:
print("最终请求失败,无法获取数据。")
except requests.Timeout:
print("请求超时!请稍后重试。")
except requests.RequestException as e:
print(f"请求时发生错误: {e}")
合并
可以将两个脚本进行合并,进一步提高速率
可以将两个输出文件也合并一下,并且去重
最后的代码如下
import requests
import json
import tldextract
# 定义公共请求头
headers = {
"Accept-Encoding": "gzip, deflate, br",
"Accept": "*/*",
"Accept-Language": "en-US;q=0.9,en;q=0.8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.71 Safari/537.36",
"Cache-Control": "max-age=0",
}
# 提取根域名
def extract_root_domain(domain):
"""从域名中提取根域名(如从 'www.example.com' 提取 'example.com')"""
extracted = tldextract.extract(domain)
if extracted.domain and extracted.suffix:
return f"{extracted.domain}.{extracted.suffix}"
return None
# 发送请求并重试逻辑
def fetch_data_with_retries(url, headers, params=None, data=None, method="GET", max_retries=3, timeout=120):
"""尝试请求数据,状态码不为 200 时重试,最多重试 max_retries 次"""
attempt = 0
while attempt < max_retries:
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, params=params, timeout=timeout)
elif method.upper() == "POST":
response = requests.post(url, headers=headers, data=json.dumps(data), timeout=timeout)
else:
raise ValueError("Unsupported HTTP method.")
if response.status_code == 200:
return response
else:
print(f"请求失败,状态码: {response.status_code}。尝试重新请求 ({attempt + 1}/{max_retries})...")
except requests.RequestException as e:
print(f"请求时发生错误: {e}。尝试重新请求 ({attempt + 1}/{max_retries})...")
attempt += 1
return None
# 任务1:处理 reverse-whois API 请求
def handle_reverse_whois():
url = "https://reverse-whois.whoisxmlapi.com/api/v2"
data = {
"apiKey": "at_JqV1QomuZ0wKKG8tVc9VBkr2g0Kdp",
"searchType": "current",
"mode": "purchase",
"punycode": True,
"basicSearchTerms": {
"include": ["Airbnb, Inc."],
"exclude": []
}
}
response = fetch_data_with_retries(url, headers, data=data, method="POST")
if response and response.status_code == 200:
print("Reverse Whois 请求成功!")
result = response.json()
domains_list = result.get("domainsList", [])
if domains_list:
with open("whois.txt", "w", encoding="utf-8") as file:
for domain in domains_list:
file.write(domain + "\n")
print(f"已将 {len(domains_list)} 个域名写入到 whois.txt 文件中!")
else:
print("没有找到 domainsList 或其为空。")
else:
print("Reverse Whois 请求失败,无法获取数据。")
# 任务2:处理 crt.sh 请求
def handle_crt_sh():
url = "https://crt.sh/"
params = {
"q": "TESLA, INC.",
"output": "json"
}
response = fetch_data_with_retries(url, headers, params=params, method="GET")
if response and response.status_code == 200:
print("crt.sh 请求成功!")
try:
data = response.json()
domains = set()
for entry in data:
common_name = entry.get("common_name", "")
root_domain = extract_root_domain(common_name)
if root_domain:
domains.add(root_domain)
if domains:
with open("cert.txt", "w", encoding="utf-8") as file:
for domain in sorted(domains):
file.write(domain + "\n")
print(f"已将 {len(domains)} 个根域名写入到 cert.txt 文件中!")
else:
print("没有找到有效的域名。")
except ValueError:
print("crt.sh 响应不是有效的 JSON 格式。")
print(response.text)
else:
print("crt.sh 请求失败,无法获取数据。")
# 任务3:合并文件并去重
def merge_files_and_deduplicate(file1, file2, output_file):
try:
unique_domains = set()
for file in [file1, file2]:
with open(file, "r", encoding="utf-8") as f:
for line in f:
domain = line.strip()
if domain:
unique_domains.add(domain)
# 写入去重后的结果
with open(output_file, "w", encoding="utf-8") as f:
for domain in sorted(unique_domains):
f.write(domain + "\n")
print(f"已将合并后的去重结果写入到 {output_file} 文件中,共 {len(unique_domains)} 条记录。")
except Exception as e:
print(f"合并文件时发生错误: {e}")
# 主程序入口
if __name__ == "__main__":
print("开始处理 Reverse Whois API 请求...")
handle_reverse_whois()
print("开始处理 crt.sh 请求...")
handle_crt_sh()
print("开始合并文件并去重...")
merge_files_and_deduplicate("whois.txt", "cert.txt", "all.txt")
标签:domain,请求,收集,信息,做好,file,print,domains,response
From: https://www.cnblogs.com/freedom-h/p/18657384