首页 > 编程语言 >太极安全监控系统1.0(Python)

太极安全监控系统1.0(Python)

时间:2024-10-26 11:16:06浏览次数:3  
标签:1.0 Python ip self file address 安全监控 data port

一、项目介绍和功能介绍

1.项目介绍

安全监控系统 是一个综合性的安全监控和防护工具,旨在帮助系统管理员检测和应对网络中的可疑活动。该系统集成了多种安全技术和工具,包括日志分析、网络流量监控、机器学习模型、动态防火墙规则配置、蜜罐部署、沙箱管理和自动反击功能。通过这些功能,系统能够有效地识别和应对潜在的安全威胁。

2.兼容性

操作系统: Windows 10
依赖库: scapy, whois, numpy, scikit-learn, tensorflow, geopy, Evtx, psutil, PyQt5, matplotlib
权限: 确保运行程序的用户具有管理员权限,以便配置防火墙规则和捕获网络流量。

import os
import sys
import subprocess
import re
import datetime
import threading
import tkinter as tk
from tkinter import messagebox, simpledialog, ttk
import scapy.all as scapy
import whois
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
import json
import random
import socket
import pickle
from geopy.geocoders import Nominatim
from collections import defaultdict
import Evtx.Evtx as evtx
import tensorflow as tf
import psutil
import logging
import time
import multiprocessing
from PyQt5.QtWidgets import QApplication, QMainWindow, QTabWidget, QWidget, QVBoxLayout, QPushButton, QListWidget, QLabel, QTreeView, QFileSystemModel, QTableWidget, QTableWidgetItem, QComboBox, QProgressBar
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QPixmap, QImage
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas

# 配置日志
logging.basicConfig(filename=os.path.join('logs', 'security_system.log'), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 确保 TensorFlow 使用 GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)
        logging.error(f"TensorFlow GPU configuration error: {e}")

# 全局变量
suspicious_activities = []
packets = []
geolocator_cache = {}
whois_cache = {}
taiji_shield = None

# 配置防火墙规则
def configure_firewall():
    print("配置防火墙规则...")
    logging.info("配置防火墙规则...")
    # Windows 防火墙规则配置示例
    subprocess.run(["netsh", "advfirewall", "set", "currentprofile", "state", "on"])
    # 阻断已知恶意 IP 地址
    known_malicious_ips = ["192.168.1.100", "10.0.0.1"]
    for ip in known_malicious_ips:
        subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip])
        subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip])

# 读取和解析系统日志
def analyze_logs(log_file):
    print(f"分析日志文件 {log_file}...")
    logging.info(f"分析日志文件 {log_file}...")
    suspicious_activities = []
    
    try:
        with evtx.Evtx(log_file) as log:
            for record in log.records():
                xml = record.xml()
                if "IPTables-Input" in xml or "IPTables-Output" in xml or "Security-Audit" in xml:
                    match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', xml)
                    if match:
                        ip_address = match.group(1)
                        timestamp = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', xml)
                        if timestamp:
                            timestamp = timestamp.group(0)
                            suspicious_activities.append((timestamp, ip_address, xml.strip()))
    except Exception as e:
        print(f"分析日志文件时发生错误: {e}")
        logging.error(f"分析日志文件时发生错误: {e}")
    
    return suspicious_activities

# 使用 Scapy 抓取特定端口的流量
def capture_traffic(interface, port):
    print(f"抓取 {interface} 上的 {port} 端口流量...")
    logging.info(f"抓取 {interface} 上的 {port} 端口流量...")
    packets = scapy.sniff(iface=interface, filter=f"port {port}", count=100)
    return packets

# 获取入侵者地理位置
def get_geolocation(ip_address, geolocator_cache):
    if ip_address in geolocator_cache:
        return geolocator_cache[ip_address]
    
    try:
        geolocator = Nominatim(user_agent="security_system")
        location = geolocator.geocode(ip_address)
        if location:
            geolocator_cache[ip_address] = f"{location.city}, {location.country}"
            return geolocator_cache[ip_address]
        else:
            geolocator_cache[ip_address] = "未知位置"
            return "未知位置"
    except Exception as e:
        geolocator_cache[ip_address] = f"获取地理位置失败: {str(e)}"
        logging.error(f"获取地理位置失败: {e}")
        return geolocator_cache[ip_address]

# 验证 IP 地址
def verify_ip(ip_address, whois_cache):
    if ip_address in whois_cache:
        return whois_cache[ip_address]
    
    try:
        w = whois.whois(ip_address)
        if w and w.get('nets'):
            whois_cache[ip_address] = w.nets[0].get('description', "未知描述")
            return whois_cache[ip_address]
        else:
            whois_cache[ip_address] = "未知描述"
            return "未知描述"
    except Exception as e:
        whois_cache[ip_address] = f"验证 IP 失败: {str(e)}"
        logging.error(f"验证 IP 失败: {e}")
        return whois_cache[ip_address]

# 生成报告
def generate_report(suspicious_activities, report_file, geolocator_cache, whois_cache):
    print(f"生成报告到 {report_file}...")
    logging.info(f"生成报告到 {report_file}...")
    os.makedirs(os.path.dirname(report_file), exist_ok=True)
    with open(report_file, 'w') as file:
        file.write("可疑活动报告\n")
        file.write("=" * 30 + "\n")
        file.write(f"生成时间: {datetime.datetime.now()}\n")
        file.write("\n")
        file.write("时间戳\tIP 地址\t地理位置\t描述\t日志条目\n")
        file.write("-" * 80 + "\n")
        for activity in suspicious_activities:
            geolocation = get_geolocation(activity[1], geolocator_cache)
            description = verify_ip(activity[1], whois_cache)
            file.write(f"{activity[0]}\t{activity[1]}\t{geolocation}\t{description}\t{activity[2]}\n")

# 生成沙箱环境
def create_sandbox(ip_address):
    print(f"为 IP 地址 {ip_address} 创建沙箱...")
    logging.info(f"为 IP 地址 {ip_address} 创建沙箱...")
    sandbox_dir = os.path.join('temp', f'sandbox_{ip_address}')
    os.makedirs(sandbox_dir, exist_ok=True)
    
    # 模拟多线程和多核处理
    def simulate_computation():
        for i in range(1000000):
            pass
    
    processes = []
    for _ in range(2):
        process = multiprocessing.Process(target=simulate_computation)
        processes.append(process)
        process.start()
    
    for process in processes:
        process.join()
    
    with open(os.path.join(sandbox_dir, "README.txt"), 'w') as file:
        file.write(f"沙箱环境用于 IP 地址: {ip_address}\n")
        file.write("此目录被隔离以防止潜在威胁。\n")
    
    return sandbox_dir

# 自适应防护机制(太极护盾)
class TaiJiShield:
    def __init__(self):
        self.isolation_forest = IsolationForest(contamination=0.01)
        self.one_class_svm = OneClassSVM(nu=0.01, kernel='rbf', gamma='scale')
        self.scaler = StandardScaler()
        self.data = []
        self.model_trained = False
        self.model_path = os.path.join('models', 'taiji_model.pkl')
    
    def train(self, new_data):
        self.data.extend(new_data)
        self.data = np.array(self.data)
        self.data = self.scaler.fit_transform(self.data)
        self.isolation_forest.fit(self.data)
        self.one_class_svm.fit(self.data)
        self.model_trained = True
        self.save_model()
        logging.info("模型训练完成并保存")
    
    def incremental_train(self, new_data):
        if not self.model_trained:
            self.train(new_data)
            return
        
        new_data = np.array(new_data)
        new_data = self.scaler.transform(new_data)
        self.isolation_forest.fit(new_data)
        self.one_class_svm.fit(new_data)
        self.save_model()
        logging.info("模型增量训练完成并保存")
    
    def predict(self, data):
        if not self.model_trained:
            return 1, 1
        data = np.array(data).reshape(1, -1)
        data = self.scaler.transform(data)
        isolation_forest_pred = self.isolation_forest.predict(data)
        one_class_svm_pred = self.one_class_svm.predict(data)
        return isolation_forest_pred[0], one_class_svm_pred[0]
    
    def save_model(self):
        model_data = {
            'isolation_forest': self.isolation_forest,
            'one_class_svm': self.one_class_svm,
            'scaler': self.scaler
        }
        with open(self.model_path, 'wb') as file:
            pickle.dump(model_data, file)
        logging.info("模型已保存到文件")
    
    def load_model(self):
        if os.path.exists(self.model_path):
            with open(self.model_path, 'rb') as file:
                model_data = pickle.load(file)
                self.isolation_forest = model_data['isolation_forest']
                self.one_class_svm = model_data['one_class_svm']
                self.scaler = model_data['scaler']
                self.model_trained = True
                logging.info("模型已从文件加载")

# 动态黑名单
def update_blacklist(ip_address):
    blacklist_path = os.path.join('temp', 'blacklist.conf')
    with open(blacklist_path, 'a') as file:
        file.write(f"{ip_address}\n")
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip_address])
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip_address])

# 部署蜜罐
def deploy_honeypot(port):
    def handle_client(client_socket):
        client_socket.send(b"Welcome to the honeypot!")
        client_socket.close()
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('0.0.0.0', port))
    server.listen(5)
    print(f"Honeypot listening on port {port}")
    logging.info(f"Honeypot listening on port {port}")
    
    while True:
        client_socket, addr = server.accept()
        print(f"Connection from {addr}")
        logging.info(f"Connection from {addr}")
        threading.Thread(target=handle_client, args=(client_socket,)).start()

# 动态端口跳跃
def dynamic_port_jumping(service_port):
    new_port = random.randint(1024, 65535)
    subprocess.run(["netsh", "interface", "portproxy", "add", "v4tov4", "listenport=" + str(service_port), "connectport=" + str(new_port), "connectaddress=127.0.0.1"])
    print(f"Service port {service_port} redirected to {new_port}")
    logging.info(f"Service port {service_port} redirected to {new_port}")

# 模拟攻击
def simulate_attack(ip_address):
    for _ in range(10):
        subprocess.run(["ping", "-n", "1", ip_address])

# 有限反击增强
def enhanced_limited_counterattack(ip_address):
    print(f"对 IP 地址 {ip_address} 进行增强有限反击...")
    logging.info(f"对 IP 地址 {ip_address} 进行增强有限反击...")
    update_blacklist(ip_address)
    deploy_honeypot(random.randint(1024, 65535))
    dynamic_port_jumping(22)
    simulate_attack(ip_address)

# 数据流识别与转化
def transform_traffic(packets):
    transformed_packets = []
    for packet in packets:
        if packet.haslayer(scapy.IP):
            packet[scapy.IP].src = "192.168.1.100"  # 模拟正常流量
            packet[scapy.IP].dst = "192.168.1.101"
        transformed_packets.append(packet)
    return transformed_packets

# 蜜罐虚拟小世界空间
def create_advanced_honeypot(ip_address):
    sandbox_dir = create_sandbox(ip_address)
    honeypot_port = random.randint(1024, 65535)
    threading.Thread(target=deploy_honeypot, args=(honeypot_port,)).start()
    return sandbox_dir, honeypot_port

# 对冲数据流反击
def release_counter_traffic(packets):
    transformed_packets = transform_traffic(packets)
    for packet in transformed_packets:
        scapy.sendp(packet, iface="以太网")  # 替换为实际的网络接口名称

# 失控次级蜜罐的定位、突破、引爆
def explode_honeypot(ip_address, sandbox_dir, honeypot_port):
    print(f"引爆蜜罐 {ip_address}:{honeypot_port}...")
    logging.info(f"引爆蜜罐 {ip_address}:{honeypot_port}...")
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip_address])
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip_address])

# 高级防护机制
def advanced_protection(ip_address, data_stream):
    print(f"对 IP 地址 {ip_address} 进行高级防护...")
    logging.info(f"对 IP 地址 {ip_address} 进行高级防护...")
    # 复制数据流
    copied_data = data_stream.copy()
    
    # 反向释放对冲数据流
    release_counter_traffic(copied_data)
    
    # 模拟学习入侵者攻击方法
    def simulate_attack(data):
        # 模拟学习过程
        model = Sequential()
        model.add(Dense(64, input_dim=len(data[0]), activation='relu'))
        model.add(Dropout(0.5))
        model.add(Dense(1, activation='sigmoid'))
        model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
        model.fit(data, np.zeros(len(data)), epochs=10, batch_size=32)
        return model
    
    model = simulate_attack(data_stream)
    
    # 形成沙盒等虚拟小空间进行捕捉、吞噬、拟合、强化、反击
    sandbox_dir, honeypot_port = create_advanced_honeypot(ip_address)
    
    # 发送即将失控的沙箱到对方并进行引爆
    explode_honeypot(ip_address, sandbox_dir, honeypot_port)

# 动态端口监控
def monitor_ports():
    while True:
        open_ports = []
        for conn in psutil.net_connections(kind='inet'):
            if conn.status == psutil.CONN_LISTEN:
                open_ports.append(conn.laddr.port)
        
        # 检查是否有新的未授权端口
        authorized_ports = [22, 80, 443]  # 替换为实际的授权端口列表
        unauthorized_ports = set(open_ports) - set(authorized_ports)
        
        if unauthorized_ports:
            for port in unauthorized_ports:
                print(f"检测到未授权端口: {port}")
                logging.warning(f"检测到未授权端口: {port}")
                # 关闭未授权端口
                subprocess.run(["netsh", "interface", "portproxy", "delete", "v4tov4", "listenport=" + str(port)])
                subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockUnauthorizedPort", "dir=in", "action=block", "protocol=TCP", "localport=" + str(port)])
                subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockUnauthorizedPort", "dir=out", "action=block", "protocol=TCP", "localport=" + str(port)])
        
        # 休眠一段时间后再次检查
        time.sleep(60)

# 获取网络连接信息
def get_network_connections():
    connections = []
    for conn in psutil.net_connections(kind='inet'):
        if conn.raddr and conn.laddr:
            sent_bytes = conn.bytes_sent / 3  # 假设每3秒更新一次
            recv_bytes = conn.bytes_recv / 3  # 假设每3秒更新一次
            total_bytes = (conn.bytes_sent + conn.bytes_recv) / 3  # 假设每3秒更新一次
            connections.append({
                'pid': conn.pid,
                'laddr': f"{conn.laddr.ip}:{conn.laddr.port}",
                'raddr': f"{conn.raddr.ip}:{conn.raddr.port}",
                'status': conn.status,
                'sent_bytes': sent_bytes,
                'recv_bytes': recv_bytes,
                'total_bytes': total_bytes,
                'process': psutil.Process(conn.pid).name() if conn.pid else 'Unknown'
            })
    return connections

# 主函数
class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("安全监控系统")
        
        # 创建 TabWidget
        self.tab_widget = QTabWidget()
        self.setCentralWidget(self.tab_widget)
        
        # 创建实时监控标签页
        self.monitoring_tab = QWidget()
        self.tab_widget.addTab(self.monitoring_tab, "实时监控")
        
        # 创建日志查看标签页
        self.log_tab = QWidget()
        self.tab_widget.addTab(self.log_tab, "日志查看")
        
        # 创建沙箱管理标签页
        self.sandbox_tab = QWidget()
        self.tab_widget.addTab(self.sandbox_tab, "沙箱管理")
        
        # 创建反击操作标签页
        self.counterattack_tab = QWidget()
        self.tab_widget.addTab(self.counterattack_tab, "反击操作")
        
        # 创建网络连接监控标签页
        self.network_tab = QWidget()
        self.tab_widget.addTab(self.network_tab, "网络连接监控")
        
        # 实时监控标签页
        self.listbox = QListWidget()
        self.refresh_button = QPushButton("刷新可疑活动")
        self.refresh_button.clicked.connect(self.refresh_suspicious_activities)
        
        layout = QVBoxLayout()
        layout.addWidget(self.listbox)
        layout.addWidget(self.refresh_button)
        self.monitoring_tab.setLayout(layout)
        
        # 日志查看标签页
        self.log_text = QLabel("日志内容")
        self.report_button = QPushButton("显示报告")
        self.report_button.clicked.connect(self.show_report)
        
        layout = QVBoxLayout()
        layout.addWidget(self.log_text)
        layout.addWidget(self.report_button)
        self.log_tab.setLayout(layout)
        
        # 沙箱管理标签页
        self.sandbox_listbox = QListWidget()
        self.refresh_sandboxes_button = QPushButton("刷新沙箱列表")
        self.refresh_sandboxes_button.clicked.connect(self.refresh_sandboxes)
        
        layout = QVBoxLayout()
        layout.addWidget(self.sandbox_listbox)
        layout.addWidget(self.refresh_sandboxes_button)
        self.sandbox_tab.setLayout(layout)
        
        # 反击操作标签页
        self.counterattack_entry = QLineEdit()
        self.counterattack_button = QPushButton("执行反击")
        self.counterattack_button.clicked.connect(self.perform_counterattack)
        
        layout = QVBoxLayout()
        layout.addWidget(self.counterattack_entry)
        layout.addWidget(self.counterattack_button)
        self.counterattack_tab.setLayout(layout)
        
        # 网络连接监控标签页
        self.tree = QTreeView()
        self.refresh_connections_button = QPushButton("刷新连接")
        self.refresh_connections_button.clicked.connect(self.refresh_connections)
        
        layout = QVBoxLayout()
        layout.addWidget(self.tree)
        layout.addWidget(self.refresh_connections_button)
        self.network_tab.setLayout(layout)
        
        # 初始化太极护盾
        self.taiji_shield = TaiJiShield()
        self.taiji_shield.load_model()
        
        # 启动后台任务
        self.background_thread = BackgroundThread()
        self.background_thread.suspicious_activities_signal.connect(self.update_suspicious_activities)
        self.background_thread.start()
    
    def refresh_suspicious_activities(self):
        log_file = os.path.join('C:', 'Windows', 'System32', 'winevt', 'Logs', 'Security.evtx')
        if not os.path.exists(log_file):
            raise FileNotFoundError(f"日志文件 {log_file} 不存在")
        
        global suspicious_activities
        suspicious_activities = analyze_logs(log_file)
        self.listbox.clear()
        for activity in suspicious_activities:
            geolocation = get_geolocation(activity[1], geolocator_cache)
            description = verify_ip(activity[1], whois_cache)
            self.listbox.addItem(f"{activity[0]} - {activity[1]} - {geolocation} - {description} - {activity[2]}")
    
    def show_report(self):
        report_file = os.path.join('temp', 'suspicious_activities_report.txt')
        with open(report_file, 'r') as file:
            report_content = file.read()
        self.log_text.setText(report_content)
    
    def refresh_sandboxes(self):
        self.sandbox_listbox.clear()
        for activity in suspicious_activities:
            ip_address = activity[1]
            sandbox_dir = os.path.join('temp', f'sandbox_{ip_address}')
            if os.path.exists(sandbox_dir):
                self.sandbox_listbox.addItem(f"{ip_address} - {sandbox_dir}")
    
    def perform_counterattack(self):
        ip_address = self.counterattack_entry.text()
        response = QMessageBox.question(self, "反击操作", f"您确定要对 IP 地址 {ip_address} 进行反击吗?", QMessageBox.Yes | QMessageBox.No)
        if response == QMessageBox.Yes:
            enhanced_limited_counterattack(ip_address)
            QMessageBox.information(self, "反击成功", f"已对 IP 地址 {ip_address} 进行反击。")
    
    def refresh_connections(self):
        model = QFileSystemModel()
        model.setRootPath('')
        self.tree.setModel(model)
        self.tree.setRootIndex(model.index(''))
    
    def update_suspicious_activities(self, activities):
        self.refresh_suspicious_activities()

class BackgroundThread(QThread):
    suspicious_activities_signal = pyqtSignal(list)
    
    def run(self):
        # 抓取流量
        interface = "以太网"  # 替换为实际的网络接口名称
        global packets
        packets = capture_traffic(interface, 22)
        
        # 配置防火墙
        configure_firewall()
        
        # 生成报告
        report_file = os.path.join('temp', 'suspicious_activities_report.txt')
        os.makedirs(os.path.dirname(report_file), exist_ok=True)
        generate_report(suspicious_activities, report_file, geolocator_cache, whois_cache)
        
        # 提取特征并训练模型
        features = []
        for activity in suspicious_activities:
            ip_address = activity[1]
            geolocation = get_geolocation(ip_address, geolocator_cache)
            description = verify_ip(ip_address, whois_cache)
            features.append([ip_address, geolocation, description])
        
        # 将特征转换为数值
        features = [[hash(ip), hash(geo), hash(desc)] for ip, geo, desc in features]
        taiji_shield.incremental_train(features)
        
        # 启动动态端口监控
        threading.Thread(target=monitor_ports).start()
        
        self.suspicious_activities_signal.emit(suspicious_activities)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())

三、代码功能及改进方向

1.代码功能

(1)日志分析

功能: 读取和解析系统日志文件,提取可疑活动。
实现: 使用 Evtx 库解析 Windows 安全日志文件,提取包含 IPTables-Input、IPTables-Output 和 Security-Audit 的日志条目。

(2)网络流量监控

功能: 抓取指定端口的网络流量,进行实时监控。
实现: 使用 Scapy 库抓取网络流量,支持动态获取网络接口名称。

(3)地理位置和 IP 验证

功能: 获取入侵者的地理位置和 IP 地址的详细信息。
实现: 使用 geopy 库获取地理位置,使用 whois 库验证 IP 地址。

(4)报告生成

功能: 生成可疑活动报告,记录时间戳、IP 地址、地理位置、描述和日志条目。
实现: 将分析结果写入文本文件,方便查看和存档。

(5)沙箱管理

功能: 为可疑 IP 地址创建隔离的沙箱环境,防止潜在威胁。
实现: 使用 os 库创建沙箱目录,并模拟多线程和多核处理。

(6)自适应防护机制(太极护盾)

功能: 使用机器学习模型(Isolation Forest 和 One-Class SVM)进行异常检测,并支持增量训练。
实现: 训练模型并保存到文件,支持增量训练以提高模型的准确性和适应性。

(7)动态黑名单

功能: 将已知恶意 IP 地址添加到黑名单,并配置防火墙规则进行阻断。
实现: 使用 subprocess 库配置 Windows 防火墙规则。

(8)蜜罐部署

功能: 部署蜜罐以吸引和分析攻击行为。
实现: 使用 socket 库创建监听端口,模拟简单的响应。

(9)动态端口跳跃

功能: 动态改变服务端口,增加攻击难度。
实现: 使用 subprocess 库配置端口代理。

(10)模拟攻击

功能: 模拟简单的网络攻击,用于测试和验证系统反应。
实现: 使用 subprocess 库发送 ICMP 请求。

(11)有限反击增强

功能: 对可疑 IP 地址进行增强的反击操作,包括更新黑名单、部署蜜罐、动态端口跳跃和模拟攻击。
实现: 综合使用上述功能,提供多层次的反击手段。

(12)数据流识别与转化

功能: 识别和转化网络数据流,模拟正常流量。
实现: 使用 Scapy 库修改数据包的源和目标地址。

(13)高级防护机制

功能: 对可疑 IP 地址进行高级防护,包括反向释放对冲数据流、模拟学习攻击方法、形成虚拟沙箱和引爆蜜罐。
实现: 综合使用上述功能,提供全面的防护措施。

(14)动态端口监控

功能: 监控系统开放的端口,检测和阻止未授权端口。
实现: 使用 psutil 库获取网络连接信息,配置防火墙规则。

(15)网络连接监控

功能: 显示当前系统的网络连接信息,包括进程、PID、本地地址、远程地址、状态和传输字节数。
实现: 使用 psutil 库获取网络连接信息,通过 QTreeView 显示。

(16)用户界面

功能: 提供图形用户界面,方便用户操作和查看系统状态。
实现: 使用 PyQt5 库创建图形用户界面,支持多个标签页,包括实时监控、日志查看、沙箱管理、反击操作和网络连接监控。

2.改进方向

(1)动态获取网络接口名称

当前实现: 硬编码了网络接口名称为“以太网”。
改进方向: 使用 Scapy 或其他库动态获取当前系统的网络接口名称,并在配置中选择合适的接口。

(2)性能优化

当前实现: 模拟多线程和多核处理较为简单。
改进方向: 使用更高效的计算方法或并行计算库(如 multiprocessing)进行优化。

(3)用户界面优化

当前实现: 当前的 GUI 较为基础。
改进方向: 使用更高级的 GUI 库(如 PyQt 或 Kivy),增加图表、动画等元素,提升用户体验。

(4)安全性增强

当前实现: 反击措施较为简单。
改进方向: 引入更多的安全工具和库,实现更高级的反击措施,如释放病毒、强制断开连接等。

(5)模型训练和迭代

当前实现: 模型在每次启动时重新训练。
改进方向: 在每次检测到新的可疑活动时进行增量训练,以提高模型的准确性和适应性。

(6)日志文件解析

当前实现: 只解析包含 IPTables-Input、IPTables-Output 和 Security-Audit 的日志条目。
改进方向: 增加更多的正则表达式匹配条件,解析不同类型的安全日志。

(7)自动化和脚本化

当前实现: 手动操作较多。
改进方向: 增加自动化脚本,减少手动干预,提高效率。

(9)多平台支持

当前实现: 主要针对 Windows 10。
改进方向: 增加对其他操作系统的支持,如 Linux 和 macOS。

(9)日志记录和报警

当前实现: 基本的日志记录功能。
改进方向: 增加更详细的日志记录,支持日志归档和报警功能,如通过邮件或短信通知管理员。

(10)模块化和可扩展性

当前实现: 功能较为集中。
改进方向: 将各个功能模块化,便于维护和扩展,支持插件化设计。
通过这些改进方向,可以进一步提升系统的性能、安全性和用户体验,使其成为一个更加全面和强大的安全监控和防护工具。

3.注意事项

防火墙规则配置: 确保用户具有管理员权限,否则防火墙规则配置可能失败。
日志文件解析: 确保 Security.evtx 文件存在并且可读。
网络连接监控: 确保 psutil 库已正确安装。
蜜罐和沙箱: 确保 scapy 库已正确安装,并且用户具有足够的权限捕获网络流量。
机器学习模型: 确保 scikit-learn 和 tensorflow 库已正确安装,并且 GPU 配置正确(如果使用 GPU)。
日志记录: 确保日志文件路径正确,并且用户具有写入权限。
希望这些改进和增强措施能够帮助你更好地保护系统安全。如果有任何进一步的问题或需要更多帮助,请随时告诉我。

标签:1.0,Python,ip,self,file,address,安全监控,data,port
From: https://blog.csdn.net/weixin_54366286/article/details/143143556

相关文章

  • python内置函数大全
    文章目录一、数学运算相关二、类型转换相关三、序列操作相关四、对象操作相关五、反射操作相关六、输入输出相关七、文件操作相关八、代码编译执行相关九、装饰器相关十、其他Python的内置函数是Python提供的一系列可以直接使用的函数,这些函数涵盖了数学运算、类型......
  • Python OpenCV图像复原
    文章目录一、理论背景二、去噪方法三、具体实现步骤四、模糊处理(可选)五、注意事项PythonOpenCV图像复原是一个涉及去除噪声、模糊等失真的过程,旨在恢复图像的原始质量。以下是一个详细的案例教程,包括理论背景和具体实现步骤。一、理论背景图像噪声:图像噪声是图......
  • Python的标准库heapq模块的介绍和简单应用
    文章目录1.堆的基本概念2.`heapq`模块的基本使用2.1创建堆2.2插入元素2.3弹出元素3.其他重要函数3.1`heappushpop`3.2`heapreplace`3.3`nlargest`和`nsmallest`3.4`merge`4.堆的应用场景4.1优先队列4.2堆排序5.结论heapq是Python标准库中一个非......
  • Python玫瑰花
    1.安装(cmd命令)pipinstallturtle2.源码importturtle#设置初始位置turtle.penup()turtle.left(90)turtle.fd(200)turtle.pendown()turtle.right(90)#花蕊turtle.fillcolor("red")turtle.begin_fill()turtle.circle(10,180)turtle.circle(25,110)turt......
  • 【Python中的匿名函数】如何高效使用lambda表达式!
    Python中的匿名函数:如何高效使用lambda表达式Python中的匿名函数,也被称为lambda表达式,是一种简洁的函数定义方式。它们在某些场景中能够显著简化代码结构,提升可读性和代码执行效率。本文将详细讨论lambda表达式的使用方法、优缺点、适用场景以及使用技巧,帮助你更高效地应用......
  • 【探讨Python中的浅拷贝与深拷贝】如何避免共享引用带来的问题!
    探讨Python中的浅拷贝与深拷贝:如何避免共享引用带来的问题在Python编程中,拷贝(Copy)是一个常见的操作,尤其在数据处理、对象传递等情况下,经常会涉及数据的复制操作。浅拷贝和深拷贝的概念对于了解如何复制对象而不影响原始对象至关重要。本文将深入讨论这两种拷贝的原理、区别......
  • 计算机毕业设计Python+大模型微博情感分析 微博舆情预测 微博爬虫 微博大数据 舆情分
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!《Python+大模型微博情感分析》开题报告一、研究背景与意义随着互联网技术的飞速发展,社交媒体平台......
  • 计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!用到的技术:  1.python  2.django后端框架  3.django-simpleui,Django后台  4.......
  • Python实现微博舆情分析的设计与实现
    引言随着互联网的发展,社交媒体平台如微博已经成为公众表达意见、分享信息的重要渠道。微博舆情分析旨在通过大数据技术和自然语言处理技术,对微博上的海量信息进行情感分析、热点挖掘和趋势预测,为政府、企业和研究机构提供决策支持。本文将详细介绍如何使用Python实现微博舆情分析......
  • python 访问openai接口
    目录一、openai接口文档1.访问OpenAIAPI文档2.注册和获取API密钥3.快速开始:示例代码4.请求结构和响应格式二、步骤1、安装openai库2、示例代码实现一个命令行循环对话机器人加入gradio界面demo一、openai接口文档使用OpenAIAPI文档可以帮助你更好地......