说明:本文所有操作仅为作者的学习记录,严禁用于非法测试!
一、背景
在安全运营之暴露资产探测一文中作者分享了一种快速探测暴露资产的思路,可以快速的找到IP地址暴露在互联网上的端口服务。但是如果这些端口中暴露了一些高危的API服务,那就还要做进一步的检测,因此在本文中作者将继续说明对暴露在互联网的高危API接口进行探测的方法,从而在安全运营中更好更细的管理好自身的资产,减少风险暴露。
二、需求与目标
既然要做安全运营,当然是希望能够自动化的实现对高危API接口进行探测,工具能够自动周期性的执行,并将探测结果发送到安全运营人员的邮箱,由于可能需要探测的资源比较多,因此还要使用多线程。
三、思路
基于上述需求,作者计划编写一个Python脚本,脚本对常见的API接口进行扫描,并将扫描结果通过邮件发送到指定的邮箱,同时将脚本通过定时任务定期执行。将待扫描的IP地址及高危API接口分别存入文本中,在脚本中以参数的方式进行传递,并且通过手工的方式指定线程的并发数量,当然也可以设置一个默认值。如下收集了一些常见的高危API接口,这些接口是不应该出现在互联网上:
四、脚本编写及说明
(一) API扫描方法编写
首先我编写了一个装饰器,用于记录版本信息,以及计算每次检测耗费的时间
def timeCal(f): # 装饰器,用于显示版本信息以及计算检测耗费时间
print("#********************")
print("# version 0.3")
print("# edit by xxxxx")
print("#********************")
def func():
start = time.time()
f()
end = time.time()
duration = end - start
print("Crack completed,Time cost %s seconds " % duration)
return func
#这里使用threading模块完成多线程编写,对URL进行检测,查看返回的状态码是否为200,如果为200说明该URL对应的API接口可以被访问到,然后将其记录入列表。
class MyThread(threading.Thread):
def __init__(self, queue,):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while not self._queue.empty():
url = self._queue.get()
urllib3.disable_warnings()
try:
print("Testing URL: %s" % url)
res = requests.get(url, verify=False, allow_redirects=False, timeout=10)
status_code = res.status_code
if status_code == 200:
print("======> %s status is 200" % url)
results.append(url)
except Exception as e:
pass
#这部分是解决脚本入参的,读取用户的输入,指定IP地址列表清单、API接口清单以及多线程数量,这里默认线程数量设置为100
@timeCal
def main():
parser = optparse.OptionParser("Usage:%prog -i <ipAddressFile> -w <wordFile> -t <thsCount>")
parser.add_option("-i", dest="tips", type="string", help="specify ip address")
parser.add_option("-w", dest="twordfile", type="string", help="specify words file")
parser.add_option("-t", dest="thsCount", type="string", help="specify threads count", default=100)
(options, args) = parser.parse_args()
ips = options.tips
wordfile = options.twordfile
thsCount = options.thsCount
if not ips or not wordfile or not thsCount:
print(parser.usage)
exit(0)
threads = []
threads_count = int(thsCount)
q = queue.Queue()
#读取IP地址列表,对每个IP地址与API接口进行拼接为完整的URL
with open(ips) as ips:
for ip in ips:
ip = ip.strip()
with open(wordfile) as fd:
for uri in fd:
uri = uri.strip()
url = "https://" + ip + "/" + uri
q.put(url)
for i in range(threads_count):
threads.append(MyThread(q))
for i in threads:
i.start()
for i in threads:
i.join()
(二) 邮件发送方法编写
def sendmail(ti, content):
print("扫描完成,开始发送邮件……")
mail_host = "smtp.163.com"
mail_user = "xxxxxxxxx"
mail_pass = "xxxxxxxxxxx"
newContent = '''
本次API接口扫描情况如下:
1、本次扫描时间: %s
2、本次扫描结果:\n %s
''' % (ti, content)
# 发件人
sender = " xxxxxxxxx @163.com"
# 收件人
receivers = ['[email protected]']
# 邮件正文
message = MIMEText(newContent, "plain", "utf-8")
# 邮件头部,多个收件人,注意收件人地址拼接
message['From'] = sender
message['To'] = Header(';'.join(receivers), 'utf-8')
# 邮件主题
subject = "XXXX公司 API接口泄露监测"
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP(mail_host, 25)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print("邮件发送成功")
except smtplib.SMTPException as Exception:
print(Exception)
print("Error: 发送邮件失败")
(三)main方法编写
if __name__ == '__main__':
main()
ti = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
content = '\n'
print("=======================================")
print("扫描时间: %s " % ti)
print("Test finished! The results are bellow:")
if len(results) != 0:
for result in results:
print(result)
content = content.join(results)
sendmail(ti, content)
else:
noResults = '本次扫描未发现高危API接口暴露信息'
print(noResults)
sendmail(ti, noResults)
五、工具使用
1、 使用方法:
Usage:%prog -i <ipAddressFile> -w <wordFile> -t <thsCount>
2、 测试一个存在API接口泄露的IP地址,不手工设置进程数量,默认为100.
python .\unauthorization04.py -i .\ip1.txt -w .\actuator.txt
控制台输出:
邮箱收到的提示:
3、测试一个存在API接口泄露的IP地址,手工设置进程数量100
python .\unauthorization04.py -i .\ip1.txt -w .\actuator.txt -t 50
控制台输出:
邮箱收到的提示:
4、通过上述测试,脚本已经达到目标,接下来就是在linux服务器上以定时任务周期性执行即可,不再演示。
标签:__,接口,探测,API,threads,IP地址,print From: https://blog.51cto.com/u_9652359/8072947