华为防火墙SSL_VPN异地登录报警
脚本功能
此Python脚本的主要功能是从Elasticsearch中检索登录成功的日志,检查用户的登录信息是否发生变化,并将相关信息存储到MySQL数据库中。如果检测到用户的登录IP地址或地理位置发生变化,则发送钉钉通知警告。如不知道es如何采集华为防火墙日志,可以关注我公众号私聊我。
个人博客
个人博客直达地址
网站不断完善中里面拥有大量的脚本,并且源码完全开放 欢迎纯白嫖。关注公众私信可免费写脚本
效果图
主要功能:
-
从Elasticsearch中查询日志:
- 查询最近10分钟内的
LOGONSUCCESS
日志。
- 查询最近10分钟内的
-
连接MySQL数据库:
- 连接到指定的MySQL数据库,并检查/创建名为
login_logs
的表,用于存储用户登录记录。
- 连接到指定的MySQL数据库,并检查/创建名为
-
解析日志信息:
- 提取用户名称、源IP地址、登录时间、登录模式和认证模式。
-
检查用户登录信息:
- 如果用户记录存在,比较新的IP地址和城市与数据库中的记录是否有所变化。
- 如果发生变化,发送钉钉通知,并更新数据库中的记录。
-
处理新用户记录:
- 如果用户记录不存在,插入新的登录记录到数据库。
使用方法
-
配置环境:
- 确保已安装以下Python库:
requests
、pymysql
和ipinfo
。 - 替换脚本中的
ipinfo.getHandler('93b3ed75decae0')
中的token为你的实际token。 - 根据实际情况配置Elasticsearch主机、MySQL数据库连接信息和钉钉Webhook地址。
- 确保已安装以下Python库:
-
运行脚本:
- 运行此脚本,脚本会自动连接到Elasticsearch和MySQL,进行数据查询和处理。
注意事项
-
安全性:
- 避免将敏感信息(如数据库密码、Webhook地址)硬编码在脚本中,可以使用环境变量或配置文件来存储这些信息。
-
API调用限制:
ipinfo
API可能有调用限制,请注意不要超出API调用限制。
-
异常处理:
- 脚本中已包含基础的异常处理,但建议根据实际需求添加更多详细的错误处理和日志记录,以便在生产环境中更好地监控和调试。
-
数据库表结构:
- 确保MySQL数据库中的
login_logs
表结构与脚本中的定义匹配。
- 确保MySQL数据库中的
-
数据更新:
- 脚本每次运行都会查询Elasticsearch和更新数据库,确保数据库操作的事务一致性和性能。
-
Elasticsearch查询:
- 确保Elasticsearch索引和字段名称与脚本中的查询匹配,避免查询失败。
#!/usr/bin/python3
import requests
from datetime import datetime
import pymysql
import ipinfo
# Elasticsearch主机和端口
es_host = "http://10.1.7.110:9200"
# MySQL数据库连接信息
db_host = "自己mysql的ip"
db_user = "univpn"
db_password = "自己的密码"
db_name = "univpn"
# 钉钉Webhook地址
webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=b0deef874b979eeb97d3e374e9c9867cb269d110427e9f590038bef0c40XXXX自己的webhook地址"
# 获取当前日期
current_date = datetime.now().strftime("%Y.%m.%d")
index_name = f"172_31_1_1-{current_date}"
# 构造查询语句
query = {
"query": {
"bool": {
"must": [
{"range": {"@timestamp": {"gte": f"now-10m"}}},
{"match_phrase": {"message": "LOGONSUCCESS"}}
]
}
}
}
# 发送请求到Elasticsearch
url = f"{es_host}/{index_name}/_search?pretty"
headers = {"Content-Type": "application/json"}
response = requests.get(url, headers=headers, json=query)
# 连接数据库
conn = pymysql.connect(host=db_host, user=db_user, password=db_password, database=db_name)
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute("""
CREATE TABLE IF NOT EXISTS login_logs (
id INT AUTO_INCREMENT PRIMARY KEY,
user_name VARCHAR(255),
source_ip VARCHAR(255),
ip_city VARCHAR(255),
logon_time DATETIME,
logon_mode VARCHAR(255),
authentication_mode VARCHAR(255)
)
""")
def ip_city_changed(ip):
try:
ip_handler = ipinfo.getHandler('93b3ed75decae0') # 替换为你的ipinfo token
ip_info = ip_handler.getDetails(ip)
ip_city = f"{ip_info.country} - {ip_info.city}"
return ip_city
except Exception as e:
print("获取IP信息出错:", e)
return "Unknown"
# 处理查询结果
for hit in response.json()["hits"]["hits"]:
source = hit["_source"]
message = source["message"]
user_name = message.split("User Name=")[1].split(",")[0]
source_ip = message.split("Source IP=")[1].split(",")[0]
logon_time = message.split("Logon Time=")[1].split(",")[0]
logon_mode = message.split("Logon Mode=")[1].split(",")[0]
authentication_mode = message.split("Authentication Mode=")[1].split(",")[0]
print(f"用户名: {user_name}, 源IP地址: {source_ip}, 登录时间: {logon_time}, 登录模式: {logon_mode}, 认证模式: {authentication_mode}")
# 查询数据库中是否存在该用户
cursor.execute("SELECT * FROM login_logs WHERE user_name = %s", (user_name,))
result = cursor.fetchone()
if result:
# 用户存在,比较IP和城市是否变更
if result[2] != source_ip or result[6] != ip_city_changed(source_ip):
# IP或城市变更,发送异地登录通知
last_login_time = result[3]
last_logon_mode = result[4]
last_authentication_mode = result[5]
last_ip_city = result[6]
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# 获取 IP 地址信息
ip_city = ip_city_changed(source_ip)
print("成功获取IP信息:", ip_city)
# 检查 IP 和城市是否同时变更
if result[2] != source_ip and result[6] != ip_city:
# IP和城市都变更,发送异地登录通知
message = f"用户名: {user_name}\n本次登录时间: {logon_time}\n本次登录IP: {source_ip}({ip_city})\n本次登录模式: {logon_mode}\n本次认证模式: {authentication_mode}\n\n\n上次登录时间: {last_login_time}\n上次登录IP: {result[2]}({last_ip_city})\n上次登录模式: {last_logon_mode}\n上次认证模式: {last_authentication_mode}\n当前时间: {current_time}"
data = {
"msgtype": "text",
"text": {
"content": f"univpn异地登录警告通知:\n{message}"
}
}
requests.post(webhook_url, json=data)
# 更新数据库
cursor.execute(
"UPDATE login_logs SET source_ip = %s, ip_city = %s, logon_time = %s, logon_mode = %s, authentication_mode = %s WHERE user_name = %s",
(source_ip, ip_city, logon_time, logon_mode, authentication_mode, user_name))
except Exception as e:
ip_city = "Unknown"
print("获取IP信息出错:", e)
if result[2] != source_ip and result[6] != ip_city:
# IP和城市都变更,发送异地登录通知
message = f"用户名: {user_name}\n本次登录时间: {logon_time}\n本次登录IP: {source_ip}\n本次登录模式: {logon_mode}\n本次认证模式: {authentication_mode}\n\n\n上次登录时间: {last_login_time}\n上次登录IP: {result[2]}({last_ip_city})\n上次登录模式: {last_logon_mode}\n上次认证模式: {last_authentication_mode}\n当前时间: {current_time}"
data = {
"msgtype": "text",
"text": {
"content": f"Univpn异地登录警告通知:\n{message}"
}
}
requests.post(webhook_url, json=data)
# 更新数据库
cursor.execute(
"UPDATE login_logs SET source_ip = %s, ip_city = %s, logon_time = %s, logon_mode = %s, authentication_mode = %s WHERE user_name = %s",
(source_ip, ip_city, logon_time, logon_mode, authentication_mode, user_name))
else:
# 用户不存在,插入新记录
try:
# 获取 IP 地址信息
ip_city = ip_city_changed(source_ip)
print("成功获取IP信息:", ip_city)
except Exception as e:
ip_city = "Unknown"
print("获取IP信息出错:", e)
cursor.execute("INSERT INTO login_logs (user_name, source_ip, ip_city, logon_time, logon_mode, authentication_mode) VALUES (%s, %s, %s, %s, %s, %s)",
(user_name, source_ip, ip_city, logon_time, logon_mode, authentication_mode))
conn.commit()
# 关闭数据库连接
cursor.close()
conn.close()
标签:city,登录,source,ip,防火墙,SSL,mode,VPN,logon
From: https://blog.csdn.net/zhumengkang123/article/details/141720535