阿里云邮箱异地登录报警
个人博客
个人博客直达地址
网站不断完善中里面拥有大量的脚本,并且源码完全开放 欢迎纯白嫖。关注公众私信可免费写脚本
效果展示
功能概述
这段 Python 代码用于从阿里云邮箱的 API 获取最近十分钟的用户登录活动记录,并将这些记录存储到 MySQL 数据库中。它会检查是否存在异地登录的情况,如果发现异常登录(即 IP 地址、设备 ID 和城市信息均发生变化),进行了多维度判断,然后则会发送钉钉报警通知。
主要步骤
-
获取访问令牌
- 使用阿里云提供的
client_id
和client_secret
从 OAuth2 服务器获取访问令牌。
- 使用阿里云提供的
-
获取登录记录
- 使用获取到的访问令牌请求阿里云的用户登录记录 API。
- 请求中指定了时间范围为当前时间前十分钟至现在,并筛选出
USER_LOGIN
活动。
-
处理记录
- 从 API 响应中筛选出应用名称为 “阿里邮箱网页版” 的记录,并排除指定的邮箱地址。
- 将符合条件的记录插入 MySQL 数据库,若记录中 IP 地址、设备 ID 或城市信息发生变化,则发送钉钉报警。
-
数据库操作
- 连接 MySQL 数据库,创建表(如果不存在),并插入或更新记录。
- 查询数据库中的记录来检测是否存在异地登录的情况。
- 如果发现 IP 地址、设备 ID 和城市信息都发生了变化,则发送报警消息。
-
发送报警
- 使用钉钉的 Webhook 将报警消息发送到指定的钉钉群组。
使用方法
-
配置
- 替换
webhook_url
、client_id
、client_secret
和ipinfo
的 API 密钥为你自己的。 - 替换数据库连接参数(
user
、password
、host
和database
)。 - 更新
emails
列表,排除需要忽略的邮箱地址。
- 替换
-
运行
- 将脚本保存为
.py
文件,确保你的环境中已安装requests
、mysql-connector-python
和ipinfo
库。 - 使用 Python 运行脚本:
python3 your_script_name.py
。
- 将脚本保存为
注意事项
-
安全性
- 确保你的
client_id
和client_secret
保密,不要暴露在代码中。 - 钉钉 Webhook URL 和数据库连接信息也应当保密。
- 确保你的
-
数据库表结构
- 确保 MySQL 数据库的表结构与代码中的 SQL 语句匹配。
-
API 限制
- 监控 API 请求限制,避免超出调用频率或额度。
-
IP 信息
- IP 信息查询可能需要额外的 API 配额或权限。
-
错误处理
- 代码中已经包括基本的错误处理,但在生产环境中可能需要更详细的日志记录和异常处理策略。
-
依赖库
- 确保你使用的库与代码兼容,并在虚拟环境中进行测试以避免版本冲突。
#!/usr/bin/python3
import json
import requests
from datetime import datetime, timedelta
import mysql.connector
import ipinfo
def send_alert_dingtalk(message):
webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=b0deef874b979eeb97d3e374e9c9867cb269d110427e9f590038bef0c40XXXX自己的webhook"
headers = {'Content-Type': 'application/json;charset=utf-8'}
data = {
"msgtype": "text",
"text": {
"content": message
}
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
if response.status_code != 200:
print("发送钉钉消息失败:", response.text)
# 获取访问令牌的URL
url_token = "https://alimail-cn.aliyuncs.com/oauth2/v2.0/token"
# 访问令牌请求的参数
token_payload = {
"grant_type": "client_credentials",
"client_id": "自己阿里云的id",
"client_secret": "自己阿里云的secret"
}
# 发送获取访问令牌的请求
token_response = requests.post(url_token, data=token_payload)
token_data = token_response.json()
access_token = token_data.get("access_token")
# 检查是否成功获取访问令牌
if not access_token:
print("获取访问令牌失败。")
exit()
# 指定的邮箱
emails = [
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
"排除的邮箱@自己邮箱域名.com",
]
# 计算当前时间和十分钟之前的时间
current_time = datetime.now()
ten_minutes_ago = current_time - timedelta(minutes=10)
# 使用获取到的访问令牌进行API请求
url_reports = "https://qiye.aliyun.com/rpc/v2/report/records/"
# API请求的参数
payload = {
"beginTimestamp": ten_minutes_ago.strftime("%Y-%m-%dT%H:%M:%S%z+08:00"),
"endTimestamp": current_time.strftime("%Y-%m-%dT%H:%M:%S%z+08:00"),
"activities": ["USER_LOGIN"],
"offset": 0,
"length": 100,
"filters": {}
}
# 请求头
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-cn",
"Authorization": f"Bearer {access_token}",
"Connection": "keep-alive",
"Content-Length": "154",
"Content-Type": "application/json;charset=UTF-8",
"Host": "work.aliyun.com",
"Origin": "https://work.aliyun.com",
"Referer": "https://work.aliyun.com/admin/",
"Sec-Ch-Ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
}
# 连接数据库
cnx = mysql.connector.connect(user='数据库user名', password='数据库密码',
host='数据库地址',
database='数据库名')
cursor = cnx.cursor()
# 创建表(如果不存在)
create_table_query = """
CREATE TABLE IF NOT EXISTS ali_email_info (
id INT AUTO_INCREMENT PRIMARY KEY,
activity VARCHAR(100),
app_name VARCHAR(100),
email VARCHAR(100),
ip VARCHAR(20),
city VARCHAR(100),
timestamp DATETIME,
user_id VARCHAR(100),
device_id VARCHAR(100),
trusted_device VARCHAR(10),
diff_region_login VARCHAR(10)
)
"""
cursor.execute(create_table_query)
# 发送API请求
try:
response = requests.post(url_reports, headers=headers, data=json.dumps(payload))
response_data = response.json()
# 打印满足条件的记录,并检查IP国家和trusted_device
filtered_records = [
record
for record in response_data.get("records", [])
if record.get("appName") == "阿里邮箱网页版" and record.get("email") not in emails
]
# 将记录插入数据库,并检查IP国家和trusted_device
insert_query = """
INSERT INTO ali_email_info
(activity, app_name, email, ip, city, timestamp, user_id, device_id, trusted_device, diff_region_login)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for record in filtered_records:
timestamp_datetime = datetime.strptime(record.get("timestamp").replace("Z", "+00:00"), "%Y-%m-%dT%H:%M:%S%z")
timestamp_datetime += timedelta(hours=8)
data = (
record.get("activity"),
record.get("appName"),
record.get("email"),
record.get("ip"),
None, # 先设置为 None,后面再更新为城市信息
timestamp_datetime,
record.get("userId"),
json.loads(record.get('properties', {}).get('deviceId', '{}')).get('value', 'N/A'), # 取设备ID的value值
record.get('properties', {}).get('trustedDevice', 'N/A'),
record.get('properties', {}).get('diffRegionLogin', 'N/A')
)
# 查询IP对应的城市信息
try:
ip_handler = ipinfo.getHandler('自己的ipinfo的token')
ip_info = ip_handler.getDetails(record.get("ip"))
city = f"{ip_info.country} - {ip_info.city}"
data = data[:4] + (city,) + data[5:] # 更新 data 中的 city
except Exception as e:
print("获取IP信息出错:", e)
# 检查数据库中是否已经存在该邮箱的记录
check_query = "SELECT * FROM ali_email_info WHERE email = %s ORDER BY timestamp ASC LIMIT 1"
cursor.execute(check_query, (record.get("email"),))
existing_record = cursor.fetchone()
print(existing_record)
# 如果数据库中没有该邮箱信息,则说明是新账号,直接插入数据
if not existing_record:
cursor.execute(insert_query, data)
cnx.commit()
else:
ip_changed = existing_record[4] != data[3]
device_id_changed = existing_record[7] != data[7]
city_changed = existing_record[-1] != data[4] # 城市信息在最后一位
# 如果同时满足IP地址变更、设备ID变更和城市变更,则发送报警
if ip_changed and device_id_changed and city_changed:
# 发送钉钉报警
message = f"阿里云邮箱异地登录告警通知!\n\n" \
f"本次登录信息:\n" \
f"邮箱:{data[2]}\n" \
f"IP 地址:{data[3]}\n" \
f"城市:{data[4]}\n" \
f"登录时间:{data[5]}\n" \
f"设备ID:{data[7]}\n" \
f"trusted_device:{data[8]}\n" \
f"diff_region_login:{data[9]}\n\n" \
f"上次登录信息:\n" \
f"邮箱:{existing_record[3]}\n" \
f"IP 地址:{existing_record[4]}\n" \
f"城市:{existing_record[10]}\n" \
f"登录时间:{existing_record[5]}\n" \
f"设备ID:{existing_record[7]}\n" \
f"trusted_device:{existing_record[8]}\n" \
f"diff_region_login:{existing_record[9]}\n"
send_alert_dingtalk(message)
# 更新数据库中的记录,保留最旧的一条记录
update_query = """
UPDATE ali_email_info
SET ip = %s, city = %s, timestamp = %s, user_id = %s, device_id = %s, trusted_device = %s, diff_region_login = %s
WHERE id = %s
"""
new_data = data[3:] + (existing_record[0],)
cursor.execute(update_query, new_data)
cnx.commit()
# 打印查询出的记录信息
print("查询结果:")
print(f"活动:{data[0]}")
print(f"应用名称:{data[1]}")
print(f"邮箱:{data[2]}")
print(f"IP 地址:{data[3]}")
print(f"城市:{data[4]}")
print(f"时间戳:{data[5]}")
print(f"用户ID:{data[6]}")
print(f"设备ID:{data[7]}")
print(f"trusted_device:{data[8]}")
print(f"diff_region_login:{data[9]}")
print("======================================")
except Exception as e:
print("请求失败:", e)
cnx.close()
标签:data,报警,排除,record,域名,异地,邮箱,com
From: https://blog.csdn.net/zhumengkang123/article/details/141721094