不管是iOS APP,微信小程序,还是浏览器,随着对安全要求的提高,HTTPS已经几乎是所有网站的标配的。作为运维,监控并保证SSL证书的可用性就成了一个很现实的问题。
在早期,域名不是很多的情况下,使用的SSL证书数量有限。可以采用手动录入,定期检查的方式来确保SSL证书在有效期内。但是随着业务的增长,手动录入到列表的方式可能会出现遗漏,于是采用了从域名解析的角度来处理。
不管是何种情况,证书检查的核心是可以通过域名(域名获取),自动判断SSL证书的有效期(证书有效性判断),并对即将到期的证书通过适当的方式进行报警通知(报警通知)。
证书有效性判断
这个是整个流程中的核心内容,在这里主要是用python代码实现,原理就是创建socket连接访问域名的443端口,获取证书信息。
代码如下
import socket
import ssl
def get_ssl_certificate_info(domain):
try:
# 建立一个与目标域名的SSL连接,并设置连接超时时间为5秒
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
# 获取SSL证书
cert = ssock.getpeercert()
# 打印证书信息
print("Subject:", cert['subject'])
print("Issuer:", cert['issuer'])
print("Valid From:", cert['notBefore'])
print("Valid Until:", cert['notAfter'])
except (socket.gaierror, ConnectionRefusedError, ssl.CertificateError, socket.timeout) as e:
print(f"Error connecting to {domain}: {e}")
if __name__ == "__main__":
domain = "example.com" # 将此处替换为你要查询的域名
get_ssl_certificate_info(domain)
一般来说443端口如果不通,可能是这个域名没有配置https服务,所以超时时间就值设定了5s。
域名获取
在上文提到过,在早期如果域名比较少,可以手动维护域名列表,但是随着业务的扩展,难免会出现遗留。这里就换个思路从DNS方面获取所有域名。
根据各个DNS服务商的SDK一般都可以获取到域名列表,这里以阿里云为例,通过python SDK 获取域名列表。
在以下代码中
DescribeDomains 用于获取主域名
DescribeDomainRecords 用于获取域名记录
调用阿里云SDK需要ccess_key_id access_key_secret
from alibabacloud_alidns20150109 import models as alidns_20150109_models
from alibabacloud_alidns20150109.client import Client as Alidns20150109Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class DescribeDomains:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Alidns20150109Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
# 必填,您的 AccessKey ID,
access_key_id=access_key_id,
# 必填,您的 AccessKey Secret,
access_key_secret=access_key_secret
)
# Endpoint 请参考 https://api.aliyun.com/product/Alidns
config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com'
return Alidns20150109Client(config)
@staticmethod
def main(
accessKeyId,
accessKeySecret,
):
# 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 工程代码泄露可能会导致 AccessKey
# 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS
# 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
client = DescribeDomains.create_client(accessKeyId, accessKeySecret)
describe_domains_request = alidns_20150109_models.DescribeDomainsRequest()
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
res = client.describe_domains_with_options(describe_domains_request, runtime)
body = res.to_map()["body"]
domains = []
# 获取主域名列表并返回
for d in body["Domains"]["Domain"]:
domains.append(d["DomainName"])
return True, domains
except Exception as error:
# 如有需要,请打印 error
res = UtilClient.assert_as_string(error.message)
return False, str(res)
class DescribeDomainRecords:
def __init__(self):
# 初始化查询参数
self.page_number = 1
self.page_size = 100
self.total_records = []
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Alidns20150109Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
# 必填,您的 AccessKey ID,
access_key_id=access_key_id,
# 必填,您的 AccessKey Secret,
access_key_secret=access_key_secret
)
# Endpoint 请参考 https://api.aliyun.com/product/Alidns
config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com'
return Alidns20150109Client(config)
def main(
self,
accessKeyId,
accessKeySecret,
domain_name,
):
while True:
# 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 工程代码泄露可能会导致 AccessKey
# 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS
# 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
client = DescribeDomainRecords.create_client(accessKeyId, accessKeySecret)
describe_domain_records_request = alidns_20150109_models.DescribeDomainRecordsRequest(
domain_name=domain_name,
page_number=self.page_number,
page_size=self.page_size
)
# 创建RuntimeOptions 实例并设置超时时间为3分钟
runtime = util_models.RuntimeOptions()
runtime.read_timeout = 180000
try:
# 复制代码运行请自行打印 API 的返回值
res = client.describe_domain_records_with_options(describe_domain_records_request, runtime)
body = res.to_map()["body"]
records = body["DomainRecords"]["Record"]
for d in records:
sub_domain = d["RR"] + "." + d["DomainName"]
self.total_records.append(sub_domain)
# 判断是否还有下一页
if len(records) < self.page_size:
return True, self.total_records
else:
self.page_number += 1
except Exception as error:
# 如有需要,请打印 error
res = UtilClient.assert_as_string(error.message)
return False, str(res)
if __name__ == "__main__":
key = "xxxx"
secret = "yyyy"
success, domains = DescribeDomains.main(key, secret)
if success:
for domain in domains:
success, total_records = DescribeDomainRecords().main(key, secret, domain)
if success:
print(total_records)
报警通知
通知方式有多种,不管是钉钉还是微信,原理上大同小异。这里就不做过多赘述。
以上三部分组合在一起,再部署到crontab或者celery中就是一个完整的SSL证书有效期报警流程了。
标签:__,domain,证书,self,报警,access,SSL,域名,key From: https://blog.51cto.com/quietguoguo/8130667