首页 > 其他分享 >阿里云邮箱异地登录报警

阿里云邮箱异地登录报警

时间:2024-09-02 16:23:37浏览次数:12  
标签:data 报警 排除 record 域名 异地 邮箱 com

阿里云邮箱异地登录报警

个人博客

个人博客直达地址
网站不断完善中里面拥有大量的脚本,并且源码完全开放 欢迎纯白嫖。关注公众私信可免费写脚本

效果展示

在这里插入图片描述

功能概述

这段 Python 代码用于从阿里云邮箱的 API 获取最近十分钟的用户登录活动记录,并将这些记录存储到 MySQL 数据库中。它会检查是否存在异地登录的情况,如果发现异常登录(即 IP 地址、设备 ID 和城市信息均发生变化),进行了多维度判断,然后则会发送钉钉报警通知。

主要步骤

  1. 获取访问令牌

    • 使用阿里云提供的 client_idclient_secret 从 OAuth2 服务器获取访问令牌。
  2. 获取登录记录

    • 使用获取到的访问令牌请求阿里云的用户登录记录 API。
    • 请求中指定了时间范围为当前时间前十分钟至现在,并筛选出 USER_LOGIN 活动。
  3. 处理记录

    • 从 API 响应中筛选出应用名称为 “阿里邮箱网页版” 的记录,并排除指定的邮箱地址。
    • 将符合条件的记录插入 MySQL 数据库,若记录中 IP 地址、设备 ID 或城市信息发生变化,则发送钉钉报警。
  4. 数据库操作

    • 连接 MySQL 数据库,创建表(如果不存在),并插入或更新记录。
    • 查询数据库中的记录来检测是否存在异地登录的情况。
    • 如果发现 IP 地址、设备 ID 和城市信息都发生了变化,则发送报警消息。
  5. 发送报警

    • 使用钉钉的 Webhook 将报警消息发送到指定的钉钉群组。

使用方法

  1. 配置

    • 替换 webhook_urlclient_idclient_secretipinfo 的 API 密钥为你自己的。
    • 替换数据库连接参数(userpasswordhostdatabase)。
    • 更新 emails 列表,排除需要忽略的邮箱地址。
  2. 运行

    • 将脚本保存为 .py 文件,确保你的环境中已安装 requestsmysql-connector-pythonipinfo 库。
    • 使用 Python 运行脚本:python3 your_script_name.py

注意事项

  1. 安全性

    • 确保你的 client_idclient_secret 保密,不要暴露在代码中。
    • 钉钉 Webhook URL 和数据库连接信息也应当保密。
  2. 数据库表结构

    • 确保 MySQL 数据库的表结构与代码中的 SQL 语句匹配。
  3. API 限制

    • 监控 API 请求限制,避免超出调用频率或额度。
  4. IP 信息

    • IP 信息查询可能需要额外的 API 配额或权限。
  5. 错误处理

    • 代码中已经包括基本的错误处理,但在生产环境中可能需要更详细的日志记录和异常处理策略。
  6. 依赖库

    • 确保你使用的库与代码兼容,并在虚拟环境中进行测试以避免版本冲突。
#!/usr/bin/python3


import json
import requests
from datetime import datetime, timedelta
import mysql.connector
import ipinfo

def send_alert_dingtalk(message):
    webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=b0deef874b979eeb97d3e374e9c9867cb269d110427e9f590038bef0c40XXXX自己的webhook"
    headers = {'Content-Type': 'application/json;charset=utf-8'}
    data = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
    if response.status_code != 200:
        print("发送钉钉消息失败:", response.text)

# 获取访问令牌的URL
url_token = "https://alimail-cn.aliyuncs.com/oauth2/v2.0/token"

# 访问令牌请求的参数
token_payload = {
    "grant_type": "client_credentials",
    "client_id": "自己阿里云的id",
    "client_secret": "自己阿里云的secret"
}

# 发送获取访问令牌的请求
token_response = requests.post(url_token, data=token_payload)
token_data = token_response.json()
access_token = token_data.get("access_token")

# 检查是否成功获取访问令牌
if not access_token:
    print("获取访问令牌失败。")
    exit()

# 指定的邮箱
emails = [
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
    "排除的邮箱@自己邮箱域名.com",
]


# 计算当前时间和十分钟之前的时间
current_time = datetime.now()
ten_minutes_ago = current_time - timedelta(minutes=10)

# 使用获取到的访问令牌进行API请求
url_reports = "https://qiye.aliyun.com/rpc/v2/report/records/"

# API请求的参数
payload = {
    "beginTimestamp": ten_minutes_ago.strftime("%Y-%m-%dT%H:%M:%S%z+08:00"),
    "endTimestamp": current_time.strftime("%Y-%m-%dT%H:%M:%S%z+08:00"),
    "activities": ["USER_LOGIN"],
    "offset": 0,
    "length": 100,
    "filters": {}
}

# 请求头
headers = {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate, br",
    "Accept-Language": "zh-cn",
    "Authorization": f"Bearer {access_token}",
    "Connection": "keep-alive",
    "Content-Length": "154",
    "Content-Type": "application/json;charset=UTF-8",
    "Host": "work.aliyun.com",
    "Origin": "https://work.aliyun.com",
    "Referer": "https://work.aliyun.com/admin/",
    "Sec-Ch-Ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
    "Sec-Ch-Ua-Mobile": "?0",
    "Sec-Ch-Ua-Platform": '"Windows"',
    "Sec-Fetch-Dest": "empty",
    "Sec-Fetch-Mode": "cors",
    "Sec-Fetch-Site": "same-origin",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
}

# 连接数据库
cnx = mysql.connector.connect(user='数据库user名', password='数据库密码',
                              host='数据库地址',
                              database='数据库名')
cursor = cnx.cursor()

# 创建表(如果不存在)
create_table_query = """
CREATE TABLE IF NOT EXISTS ali_email_info (
  id INT AUTO_INCREMENT PRIMARY KEY,
  activity VARCHAR(100),
  app_name VARCHAR(100),
  email VARCHAR(100),
  ip VARCHAR(20),
  city VARCHAR(100),
  timestamp DATETIME,
  user_id VARCHAR(100),
  device_id VARCHAR(100),
  trusted_device VARCHAR(10),
  diff_region_login VARCHAR(10)
)
"""

cursor.execute(create_table_query)

# 发送API请求
try:
    response = requests.post(url_reports, headers=headers, data=json.dumps(payload))
    response_data = response.json()

    # 打印满足条件的记录,并检查IP国家和trusted_device
    filtered_records = [
        record
        for record in response_data.get("records", [])
        if record.get("appName") == "阿里邮箱网页版" and record.get("email") not in emails
    ]

    # 将记录插入数据库,并检查IP国家和trusted_device
    insert_query = """
    INSERT INTO ali_email_info
    (activity, app_name, email, ip, city, timestamp, user_id, device_id, trusted_device, diff_region_login) 
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    for record in filtered_records:
        timestamp_datetime = datetime.strptime(record.get("timestamp").replace("Z", "+00:00"), "%Y-%m-%dT%H:%M:%S%z")
        timestamp_datetime += timedelta(hours=8)

        data = (
            record.get("activity"),
            record.get("appName"),
            record.get("email"),
            record.get("ip"),
            None,  # 先设置为 None,后面再更新为城市信息
            timestamp_datetime,
            record.get("userId"),
            json.loads(record.get('properties', {}).get('deviceId', '{}')).get('value', 'N/A'),  # 取设备ID的value值
            record.get('properties', {}).get('trustedDevice', 'N/A'),
            record.get('properties', {}).get('diffRegionLogin', 'N/A')
        )

        # 查询IP对应的城市信息
        try:
            ip_handler = ipinfo.getHandler('自己的ipinfo的token')
            ip_info = ip_handler.getDetails(record.get("ip"))
            city = f"{ip_info.country} - {ip_info.city}"
            data = data[:4] + (city,) + data[5:]  # 更新 data 中的 city
        except Exception as e:
            print("获取IP信息出错:", e)

        # 检查数据库中是否已经存在该邮箱的记录
        check_query = "SELECT * FROM ali_email_info WHERE email = %s ORDER BY timestamp ASC LIMIT 1"
        cursor.execute(check_query, (record.get("email"),))
        existing_record = cursor.fetchone()
        print(existing_record)
        # 如果数据库中没有该邮箱信息,则说明是新账号,直接插入数据
        if not existing_record:
            cursor.execute(insert_query, data)
            cnx.commit()
        else:
            ip_changed = existing_record[4] != data[3]
            device_id_changed = existing_record[7] != data[7]
            city_changed = existing_record[-1] != data[4]  # 城市信息在最后一位

            # 如果同时满足IP地址变更、设备ID变更和城市变更,则发送报警
            if ip_changed and device_id_changed and city_changed:
                # 发送钉钉报警
                message = f"阿里云邮箱异地登录告警通知!\n\n" \
                          f"本次登录信息:\n" \
                          f"邮箱:{data[2]}\n" \
                          f"IP 地址:{data[3]}\n" \
                          f"城市:{data[4]}\n" \
                          f"登录时间:{data[5]}\n" \
                          f"设备ID:{data[7]}\n" \
                          f"trusted_device:{data[8]}\n" \
                          f"diff_region_login:{data[9]}\n\n" \
                          f"上次登录信息:\n" \
                          f"邮箱:{existing_record[3]}\n" \
                          f"IP 地址:{existing_record[4]}\n" \
                          f"城市:{existing_record[10]}\n" \
                          f"登录时间:{existing_record[5]}\n" \
                          f"设备ID:{existing_record[7]}\n" \
                          f"trusted_device:{existing_record[8]}\n" \
                          f"diff_region_login:{existing_record[9]}\n"
                send_alert_dingtalk(message)

            # 更新数据库中的记录,保留最旧的一条记录
            update_query = """
            UPDATE ali_email_info 
            SET ip = %s, city = %s, timestamp = %s, user_id = %s, device_id = %s, trusted_device = %s, diff_region_login = %s
            WHERE id = %s
            """
            new_data = data[3:] + (existing_record[0],)
            cursor.execute(update_query, new_data)
            cnx.commit()

        # 打印查询出的记录信息
        print("查询结果:")
        print(f"活动:{data[0]}")
        print(f"应用名称:{data[1]}")
        print(f"邮箱:{data[2]}")
        print(f"IP 地址:{data[3]}")
        print(f"城市:{data[4]}")
        print(f"时间戳:{data[5]}")
        print(f"用户ID:{data[6]}")
        print(f"设备ID:{data[7]}")
        print(f"trusted_device:{data[8]}")
        print(f"diff_region_login:{data[9]}")
        print("======================================")

except Exception as e:
    print("请求失败:", e)

cnx.close()

标签:data,报警,排除,record,域名,异地,邮箱,com
From: https://blog.csdn.net/zhumengkang123/article/details/141721094

相关文章

  • 华为防火墙SSL_VPN异地登录报警
    华为防火墙SSL_VPN异地登录报警脚本功能此Python脚本的主要功能是从Elasticsearch中检索登录成功的日志,检查用户的登录信息是否发生变化,并将相关信息存储到MySQL数据库中。如果检测到用户的登录IP地址或地理位置发生变化,则发送钉钉通知警告。如不知道es如何采集华为防火......
  • ZeroTier-异地组网
    目录ZeroTier介绍脚本安装环境要求安装docker下载项目源码执行安装脚本下载planet文件新建网络创建网络分配网络客户端配置linux下载客户端安装包更换planet重启zerotier-one加入网络查看加入的网络信息管理后台同意加入请求查看角色windows下载安装更换planet重启服务加入网络......
  • 电科校园邮箱系统逻辑漏洞
    校园邮件系统逻辑漏洞导致邮件轰炸邮件轰炸首先通过自己的账号登录进入邮件系统之后,进入到信息修改的界面发现存在邮箱绑定功能,在尝试绑定自己的邮箱之后,可以看到存在提示“找回密码时可以使用备用邮箱找回”。输入邮箱密码之后进入到下一个页面在此页面完成邮箱绑定,在输入......
  • 解锁公众号无限邮箱地址的秘诀
    在当今数字化的时代,拥有多个邮箱地址对于公众号运营者来说有着诸多便利。今天,就为大家深度剖析获取无限邮箱地址的方法,助力你的公众号运营更上一层楼。一、谷歌邮箱:无限可能的宝藏Gmail可不仅仅是一个普通的邮箱服务,它隐藏着一个特殊的功能,能让我们通过一个邮箱账号生成......
  • 操作指南:在Ubuntu 24.04上安装网易邮箱大师
    网易邮箱大师是一款功能强大的邮箱客户端,支持多种邮箱的管理,提供了便利的邮件收发、日历管理和任务跟踪等功能。对于使用Ubuntu的用户而言,虽然Linux系统上的邮箱客户端有所限制,但我们仍然希望能够在这个开源的环境中实现高效的邮件管理。本文将详细介绍如何在Ubuntu24.04上安......
  • 点击Excel中的邮箱地址,如何默认打开FoxMail邮箱。
    在Windows系统中,若要设置Foxmail为默认邮件程序,请按照以下步骤操作:1.打开控制面板(命令:control),选择“查看方式”中的“大图标”,找到并点击“默认程序”。 2.在默认程序界面,点击“将文件类型或协议与程序关联”。 3.在默认应用中,找到电子邮件,点击,选择Foxmail。通过......
  • 109.微软邮箱强制要求使用MS Authenticator手机APP但中国没有GooglePlay的处理办法
    109.微软邮箱强制要求使用MSAuthenticator手机APP但中国没有GooglePlay的处理办法  背景: 微软邮箱强制用户使用它的Authenticator手机验证器APP(只能跳过3次), 而大部分中国用户手机上是没有谷歌框架和GooglePlay的,所以导致很多用户无法使用微软企业邮箱微软自己也发现了......
  • Java 使用QQ邮箱的接收&发送功能,入门级教程
    进入qq邮箱主页面,点击账号下滑找到POP3...如果没有开启,需要开启,开启后,点击管理服务然后点击生成授权码按照步骤执行完成后,会给你需要的授权码1.拿到授权码后,导入相关依赖,和yml相关配置,注意修改配置文件的信息<dependency><groupId>org.apache.commons</......
  • Yololov5+Pyqt5+Opencv 实时城市积水报警系统
    在现代城市生活中,积水问题不仅影响交通和人们的日常生活,还可能对城市基础设施造成潜在的威胁。为了快速、准确地识别和应对积水问题,使用计算机视觉技术进行智能积水检测成为一个重要的解决方案。在这篇博客中,我将带你一步步实现一个基于YOLOv5的积水检测系统,帮助你轻松应对城市......
  • 氢气传感器的报警值如何设定?
            氢气传感器的报警值设定需要遵循一定的规范和标准,以确保使用场所的安全。以下是关于氢气传感器报警值设定的详细分析:        一、报警值规范        根据中国的《石油化工可燃气体和有毒气体检测报警设计标准》(GB/T50493-2019),氢气传感器的......