首页 > 数据库 >探索sqlmap在WebSocket安全测试中的应用

探索sqlmap在WebSocket安全测试中的应用

时间:2024-06-04 13:44:02浏览次数:17  
标签:sqlmap HTTP 测试 url keys dict key WebSocket

探索sqlmap在WebSocket安全测试中的应用

WebSocket与HTTP的区别

WebSocket,对于初次接触的人来说,往往会引发一个疑问:既然我们已经有了广泛使用的HTTP协议,为何还需要引入另一种协议?WebSocket又能为我们带来哪些实质性的好处呢?

这背后的答案在于HTTP协议的一个关键限制——通信的发起权始终掌握在客户端手中。简单来说,如果你想知道今天的天气,你(客户端)需要主动向天气服务(服务器)发送请求,而后服务器才会返回你所需的信息。但HTTP协议并不支持服务器主动推送信息给客户端,这在某些应用场景下显得尤为不便。

想象一下,如果你正在参与一个活跃的聊天室,想要实时获取其他用户的消息,使用HTTP协议意味着你需要不断地发起请求来“轮询”服务器,看是否有新的消息产生。这种方式不仅效率低下,还极大地浪费了资源,因为客户端需要不断建立和维护与服务器的连接。

为了解决这个问题,WebSocket协议应运而生。它的出现彻底打破了HTTP协议的局限,实现了真正的双向通信——服务器可以主动推送信息给客户端,而客户端也能随时向服务器发送数据。这种双向、平等、实时的通信模式,使得WebSocket成为了一种高效的服务器推送技术,广泛应用于实时聊天、股票交易、在线游戏等多种场景。

image-20240604101030616

SQLmap简介

SQLMap是一个功能强大的自动化SQL注入工具,主要用于扫描、发现并利用给定的URL中的SQL注入漏洞。

SQLMap主要设计用于测试HTTP(S)协议下的Web应用程序的SQL注入漏洞。它基于HTTP请求和响应来检测和分析SQL注入的可能性。这意味着,SQLMap并不直接支持非HTTP协议的数据库访问或测试,如通过TCP/IP直接连接数据库服务器。

WebSocket转HTTP进行SQLmap扫描

由于SQLMap仅支持HTTP(S)协议,直接用它来扫描WebSocket接口是不可行的。为了克服这一限制,我们可以设计一个中间代理解决方案。具体来说,可以开发一个自定义脚本作为中间层,负责将SQLMap生成的HTTP请求转换为WebSocket消息,并将这些消息发送到WebSocket服务器。同样,这个脚本还需要捕获来自WebSocket服务器的响应,将其转换为HTTP响应格式,然后返回给SQLMap。

需要安装 websocket-client

pip install websocket-client

工具脚本如下:

from datetime import datetime
# 导入创建WebSocket连接的库
from websocket import create_connection
# 导入HTTP服务器的基础类和HTTPServer类
from http.server import BaseHTTPRequestHandler, HTTPServer
# 导入用于多线程HTTP服务器的类
from socketserver import ThreadingMixIn
from urllib.parse import unquote
import json
from urllib.parse import urlparse, parse_qs


# 定义一个函数,用于发送消息到WebSocket服务器并接收响应
def send_msg(url, date):
    # 创建一个WebSocket连接
    ws = create_connection(url, header=extra_headers)
    # 接收WebSocket服务器的初始响应
    resp = ws.recv()
    print(f'====================={datetime.now().strftime("%H:%M:%S")}=====================')
    print(resp)
    # 构造要发送的JSON字符串
    d = date
    # 打印要发送的JSON字符串
    print('JSON + + + + + + + + + +')
    print(d)
    # 发送JSON字符串到WebSocket服务器
    ws.send(d)
    # 接收WebSocket服务器的响应
    resp = ws.recv()
    print('RESP + + + + + + + + + +')
    print(resp)
    print('==================================================')
    return resp


# 定义一个HTTP请求处理器类
class Handler(BaseHTTPRequestHandler):

    # 处理GET请求的方法
    def do_GET(self):
        # 设置HTTP响应状态码为200(成功)
        self.send_response(200)
        # 过滤掉其他请求影响
        if str(self.path[:3]) == '/ws':
            url_ws = f'http://{hostname}:{str(serverport)}/ws?' + unquote(self.path[4:])
            dist_params = parses(url_ws)
            dates = data_dict
            key = type_conversion(dist_params, dates)
            update_nested_dict(dates, key)
            par_date = str(dates).replace("'", '"')
            # 设置响应的内容类型为文本
            self.send_header('Content-Type', 'text')
            # 结束HTTP头部信息的发送
            self.end_headers()
            # 调用send_msg函数发送参数并接收响应,然后将响应写入HTTP响应体
            resp = send_msg(ws_url, par_date)
            self.wfile.write(bytes(resp, "utf-8"))


# 定义一个继承自HTTPServer并支持多线程的服务器类
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
    pass


# 定义一个函数,用于启动HTTP服务器
def run():
    # 创建一个服务器实例,监听所有IP地址的9000端口,并使用Handler类处理请求
    server = ThreadingSimpleServer((hostname, serverport), Handler)

    try:
        # 让服务器一直运行,直到接收到键盘中断信号
        server.serve_forever()
    except KeyboardInterrupt:
        pass


# 递归函数来提取所有的key
def extract_keys(d, parent_key='', keys=None):
    if keys is None:
        keys = []
    if isinstance(d, dict):
        for k, v in d.items():
            new_key = k if not parent_key else f"{parent_key}.{k}"
            if isinstance(v, dict):
                extract_keys(v, new_key, keys)
            else:
                keys.append(new_key)
    return keys


# 获取所有values
def extract_values(d, values=None):
    if values is None:
        values = []
    if isinstance(d, dict):
        for k, v in d.items():
            if isinstance(v, dict):
                # 如果是嵌套字典或列表,则递归调用
                if isinstance(v, dict):
                    values.extend(extract_values(v))
                elif isinstance(v, list):
                    for item in v:
                        if isinstance(item, (dict, list)):
                            # 如果是嵌套在列表中的字典或列表,继续递归
                            values.extend(extract_values(item))
                        else:
                            # 如果列表中的项不是字典或列表,则直接添加
                            values.append(item)
            else:
                # 如果值不是字典或列表,则直接添加
                values.append(v)
    return values


def parses(ws_url):
    # 使用urlparse解析URL
    parsed_url = urlparse(ws_url)

    # 获取查询字符串
    query_string = parsed_url.query

    # 使用parse_qs解析查询字符串为字典(注意:如果有多个相同key的参数,值会是列表)
    params = parse_qs(query_string)
    return params


# 类型转换
def type_conversion(dist_p, ys_date):
    for key in dist_p:
        path = key.split('.')
        val = ys_date
        for i in path:
            val = val[i]
        try:
            if isinstance(val, str):
                dist_p[key] = str(dist_p[key][0])
            elif isinstance(val, int):
                dist_p[key] = int(dist_p[key][0])
            elif isinstance(val, float):
                dist_p[key] = float(dist_p[key][0])
            elif isinstance(val, list):
                dist_p[key] = list(dist_p[key][0][1:-1])
        except:
            dist_p[key] = str(dist_p[key][0])
    return dist_p


def update_nested_dict(d, u):
    for key, value in u.items():
        if '.' in key:
            # 分解嵌套路径
            keys = key.split('.')
            sub_dict = d
            for k in keys[:-1]:
                sub_dict = sub_dict.setdefault(k, {})
                # 更新最后一个键的值
            sub_dict[keys[-1]] = value
        else:
            # 直接更新非嵌套键
            d[key] = value


if __name__ == '__main__':
    # http服务ip+端口
    hostname = "localhost"
    serverport = 9909

    # WebSocket接口url
    ws_url = input('Url: ')
    # 发送内容(数据类型为字符串的字典)
    send_text = input('\nDate: ')
    # WebSocket请求头
    extra_headers = {
        'Sec-Websocket-Protocol': input('\nSec-Websocket-Protocol: ')
    }
    # json格式化请求数据
    data_dict = json.loads(send_text)
    # 获取所有的请求数据中所有的key和value
    all_keys = extract_keys(data_dict)
    all_values = extract_values(data_dict)
    # 根据key和value拼接初始url
    url = f'{hostname}:{str(serverport)}/ws?'
    for i in range(len(all_keys)):
        url = url + str(all_keys[i]) + '=' + str(all_values[i]) + '&'
        if i+1 == len(all_keys):
            url = url[:-1]
    # 输出所访问的url
    print("\n" + url)

    run()

当您运行此程序后,您将被要求输入WebSocket接口的URL、要传输的数据以及Sec-WebSocket-Protocol头部信息(您可以根据项目需求自行调整相关代码,位于168行到175行之间的部分)。随后,程序将返回与WebSocket连接建立过程中使用的HTTP协议的初始URL。

这个URL的生成是基于传输数据的JSON格式进行解析而得到的。当您变更URL中的参数值时,与该参数相对应的JSON数据中的值也会相应地进行调整,并随后被发送到WebSocket接口。所以具备了sqlmap扫描参数的条件,可以通过sqlmap -u "localhost:9909/ws?key1=value1&key2=&value2"命令来进行sqlmap扫描。

image-20240604104633544

当您通过浏览器访问指定的WebSocket接口时,您不仅能够成功地接收到返回的数据,而且在浏览器的控制台中还能清晰地看到发送的请求数据以及相应的响应结果。这一流程确保了数据传输的透明性和可追踪性,为您提供了更加直观和便捷的调试与监控体验。

image-20240604105804672

image-20240604110025306

SQLmap扫描

使用以下命令进行扫描

python sqlmap.py -u "ulr"

image-20240604110946850

扫描中

image-20240604133549966

标签:sqlmap,HTTP,测试,url,keys,dict,key,WebSocket
From: https://www.cnblogs.com/test-gang/p/18230593

相关文章

  • SQLmap常用命令使用详解
    SQLmap使用详解 一、SQLmap常规使用步骤1、查询注入点pythonsqlmap.py-v3-uhttp://127.0.0.1/sqli-labs/Less-1/?id=12、查询所有数据库pythonsqlmap.py-v3-uhttp://127.0.0.1/sqli-labs/Less-1/?id=1--dbs3、查询当前数据库pythonsqlmap.py......
  • C/C++结构体对齐测试
    #include<stddef.h>#include<iostream>structs1{inta;intb;};#pragmapack(8)structs2{charc;inta;doubleb;};structs3{charb[10];doublea;};#pragmaunpackstructs4{c......
  • allure的suites(测试套)中未显示返回值参数,显示No information about test execution is
    转自大佬:https://blog.csdn.net/sbdxmnz/article/details/137016423 ExecutionNoinformationabouttestexecutionisavailable.  解决方法:添加代码,因为pytest输出文本形式测试报告时未存储响应内容#将接口响应的文本内容附加到Allure报告中allure.attach(接口响......
  • 图形学初识--深度测试
    文章目录前言正文为什么要有深度测试?画家算法循环遮挡深度测试当代最常见实现方式?总述什么是z-buffer呢?z-buffer从哪来呢?如何利用z-buffer实现深度测试?举个例子结尾:喜欢的小伙伴点点关注+赞哦!前言本章节补充一下深度测试的内容,主要包含:为什么要有深度测试?深度测......
  • Helm 图表在调用测试(test-connection.yml)时出现任何错误,如何在 NOTES.txt 中显示错误
    下面是我的test-connection.ymlapiVersion:v1kind:Pod元数据:name:"{{include"demohelmapi.fullname".}}-test-connection";labels:{{-include"demohelmapi.labels".|nindent4}}annotations:"helm.sh/hook&qu......
  • 和GPT-4这些大模型玩狼人杀,人类因太蠢被票死,真·反向图灵测试
    最近,一位昵称「ToreKnabe」的网友在X平台发布的一段视频引发了人们的讨论。这是一次「反向图灵测试」,几个全球最先进的大模型坐在一起,坐着火车唱着歌,但其中混进了人类:而AI的任务,是把这个人类揪出来。最近,一位昵称「ToreKnabe」的网友在X平台发布的一段视频引发了......
  • ruoyi若依整合websocket
    原文链接:https://blog.csdn.net/qq_33342112/article/details/132096930注:本文档中的ruoyi框架为前后不分离版本,nginx配置与前后分离版有所不同。一、导pom,版本需与springboot版本一致<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot......
  • 【测试开发】api开发神器-第1篇-magic-api后端低代码框架
    #Magic-api介绍(低代码后端框架) magic-api是一个基于Java的接口快速开发框架。使用magic-api,彻底摆脱手动编写繁杂的Controller、Service、Dao、Mapper、XML、VO等Java对象,而是通过在线可视化UI界面,在线编写接口(动态脚本,即写即用)。通过magic-api,我们可以快速而轻松......
  • 浏览器 performance 测试页面性能,查看阻塞代码
    截图黄色指的js执行紫色指的浏览器渲染一般只需要看js和render这2个就行,其他不用看,并从这2个中看出哪里的代码导致阻塞线程比较久。main线程推测main线程就是浏览器的主线程,主要负责浏览器的渲染和js代码执行,由此可见,浏览器用于渲染和js执行是一个线程,也就是主线程。渲......
  • 测试管理-测试过程监控
    测试活动的监控,对于整体测试工程而言是非常重要的管理内容。测试工作本身是非常依赖项目其他环节的,测试活动的进行充满了变数。所以对测试的实行情况进行持续的监控和做出及时应对,是管好一个测试项目的必要工作。 测试的监控是一个贯穿于整个测试周期内的工作。在一些情况......