当前位置: 首页 > news >正文

steal tsoding‘s pastebeam code as go server

一、整体架构

  • 服务器:用Go语言实现,基于TLS加密通信,负责接收客户端连接、验证文件内容、处理POW挑战、存储文件。
  • 客户端:用Python实现,负责读取本地文件、与服务器建立加密连接、上传内容、完成POW挑战。
  • 核心机制:TLS加密传输 + 自定义文本协议 + 工作量证明(防止垃圾内容)。

1. 初始化阶段(服务器)

  1. 证书加载:服务器启动时加载TLS证书(server.crt)和私钥(server.key),用于加密通信。
  2. TLS监听:在指定端口(默认6969)启动TLS监听,等待客户端连接。
  3. 环境准备:创建文件存储目录(./posts/),用于保存上传的文件。

generate_cert.go

package mainimport ("crypto/rand""crypto/rsa""crypto/x509""crypto/x509/pkix""encoding/pem""math/big""os""time"
)func main() {// 生成RSA私钥privateKey, err := rsa.GenerateKey(rand.Reader, 4096)if err != nil {panic("无法生成私钥: " + err.Error())}// 准备证书模板template := x509.Certificate{SerialNumber: big.NewInt(1),Subject: pkix.Name{Organization: []string{"Pastebeam"},CommonName:   "localhost",},NotBefore:             time.Now(),NotAfter:              time.Now().Add(365 * 24 * time.Hour), // 有效期1年KeyUsage:              x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,ExtKeyUsage:           []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},BasicConstraintsValid: true,IsCA:                  true,}// 生成证书derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)if err != nil {panic("无法生成证书: " + err.Error())}// 保存证书到文件 (server.crt)certOut, err := os.Create("server.crt")if err != nil {panic("无法打开证书文件: " + err.Error())}defer certOut.Close()if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {panic("无法写入证书文件: " + err.Error())}// 保存私钥到文件 (server.key)keyOut, err := os.Create("server.key")if err != nil {panic("无法打开私钥文件: " + err.Error())}defer keyOut.Close()privateKeyBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)if err != nil {panic("无法编码私钥: " + err.Error())}if err := pem.Encode(keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privateKeyBytes}); err != nil {panic("无法写入私钥文件: " + err.Error())}println("证书生成成功:")println(" - server.crt (证书文件)")println(" - server.key (私钥文件)")
}

pastebeam_server.go

package mainimport ("crypto/rand""crypto/sha256""crypto/tls""encoding/base64""encoding/hex""errors""fmt""net""os""path/filepath""strings""sync""time""unicode/utf8" // 正确的UTF8验证包
)// -------------------------- 配置常量 --------------------------
const (DefaultPort         = 6969DefaultPostsRoot    = "./posts/"PostIDByteSize      = 32 // 64位十六进制IDChallengeLeadingZeros = 4 // 降低难度,加快验证ChallengeTimeoutMs   = 60 * 1000ChallengeByteSize    = 32MaxLimitPerIP        = 5PostByteSizeLimit    = 4 * 1024 // 4KB
)// -------------------------- 全局状态 --------------------------
var (ipConnCount = make(map[string]int) // IP -> 连接数mu          sync.Mutex             // 保护ipConnCountbuf         = make([]byte, 1024)   // 全局缓冲区,避免重复分配
)// -------------------------- 工具函数 --------------------------
// 生成随机Post ID(64位十六进制)
func generatePostID() string {buf := make([]byte, PostIDByteSize)_, err := rand.Read(buf)if err != nil {panic(err)}return hex.EncodeToString(buf)
}// 验证Post ID是否合法(仅十六进制字符)
func isValidPostID(id string) bool {if len(id) != PostIDByteSize*2 {return false}_, err := hex.DecodeString(id)return err == nil
}// 统计IP连接数,判断是否超限
func checkIPLimit(ip string) bool {mu.Lock()defer mu.Unlock()count := ipConnCount[ip]if count >= MaxLimitPerIP {return true // 超限}ipConnCount[ip]++return false
}// 释放IP连接数
func releaseIPLimit(ip string) {mu.Lock()defer mu.Unlock()count := ipConnCount[ip]if count <= 1 {delete(ipConnCount, ip)} else {ipConnCount[ip]--}
}// 验证UTF-8格式(使用正确的unicode/utf8包)
func isValidUTF8(b []byte) bool {return utf8.Valid(b)
}// 统计哈希前导零数量
func countLeadingZeros(hash string) int {count := 0for _, c := range hash {if c == '0' {count++} else {break}}return count
}// -------------------------- 会话处理逻辑 --------------------------
func handleSession(conn net.Conn) {// 获取客户端IPaddr, ok := conn.RemoteAddr().(*net.TCPAddr)if !ok {fmt.Println("invalid client address")conn.Close()return}clientIP := addr.IP.String()fmt.Printf("new connection from %s:%d\n", clientIP, addr.Port)// 检查IP连接数限制if checkIPLimit(clientIP) {conn.Write([]byte("TOO MANY CONNECTIONS\r\n"))fmt.Printf("%s: too many connections\n", clientIP)conn.Close()releaseIPLimit(clientIP)return}defer func() {releaseIPLimit(clientIP)conn.Close()fmt.Printf("connection closed from %s:%d\n", clientIP, addr.Port)}()// 设置读超时(防止客户端挂起)conn.SetReadDeadline(time.Now().Add(30 * time.Second))defer conn.SetReadDeadline(time.Time{}) // 清除超时// 1. 发送欢迎消息conn.Write([]byte("HI\r\n"))// 2. 读取客户端命令n, err := conn.Read(buf)if err != nil {fmt.Printf("%s: read command failed: %v\n", clientIP, err)return}cmd := strings.TrimSpace(string(buf[:n]))switch cmd {case "PARAMS":// 发送公共参数params := fmt.Sprintf("challenge_leading_zeros %d\r\n"+"challenge_timeout_ms %d\r\n"+"challenge_byte_size %d\r\n"+"max_limit_per_ip %d\r\n"+"post_byte_size_limit %d\r\n",ChallengeLeadingZeros,ChallengeTimeoutMs,ChallengeByteSize,MaxLimitPerIP,PostByteSizeLimit,)conn.Write([]byte(params))case "POST":// 处理POST流程handlePost(conn, clientIP)case "GET":// 发送确认,告知客户端可以发送 Post IDconn.Write([]byte("OK\r\n"))// 读取客户端发送的 Post ID(设置超时)conn.SetReadDeadline(time.Now().Add(10 * time.Second))n, err := conn.Read(buf)if err != nil {if errors.Is(err, os.ErrDeadlineExceeded) {conn.Write([]byte("TIMEOUT\r\n"))fmt.Printf("%s: GET ID timeout\n", clientIP)} else {conn.Write([]byte("400\r\n"))fmt.Printf("%s: read GET ID failed: %v\n", clientIP, err)}return}id := strings.TrimSpace(string(buf[:n]))handleGet(conn, clientIP, id)case "CRASH":// 触发崩溃(测试用)panic("client requested crash")default:conn.Write([]byte("INVALID COMMAND\r\n"))fmt.Printf("%s: invalid command: %s\n", clientIP, cmd)}
}// 处理POST上传逻辑
func handlePost(conn net.Conn, clientIP string) {// 1. 响应POST命令conn.Write([]byte("OK\r\n"))fmt.Printf("%s: start POST upload\n", clientIP)// 2. 读取上传内容(直到SUBMIT)var content []bytefor {conn.SetReadDeadline(time.Now().Add(30 * time.Second))n, err := conn.Read(buf)if err != nil {fmt.Printf("%s: read POST content failed: %v\n", clientIP, err)conn.Write([]byte("INTERNAL ERROR\r\n"))return}line := buf[:n]// 检查是否提交if strings.TrimSpace(string(line)) == "SUBMIT" {break}// 验证:UTF-8 + 行结尾\r\nif !isValidUTF8(line) {conn.Write([]byte("INVALID UTF8\r\n"))fmt.Printf("%s: invalid UTF8 content\n", clientIP)return}if !strings.HasSuffix(string(line), "\r\n") {conn.Write([]byte("BAD LINE ENDING\r\n"))fmt.Printf("%s: bad line ending (expected \\r\\n)\n", clientIP)return}// 验证大小限制if len(content)+len(line) > PostByteSizeLimit {conn.Write([]byte("TOO BIG\r\n"))fmt.Printf("%s: post too big (exceeds %d bytes)\n", clientIP, PostByteSizeLimit)return}// 追加内容content = append(content, line...)conn.Write([]byte("OK\r\n"))}// 3. 生成POW挑战challengeBuf := make([]byte, ChallengeByteSize)_, err := rand.Read(challengeBuf)if err != nil {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: generate challenge failed: %v\n", clientIP, err)return}challenge := base64.StdEncoding.EncodeToString(challengeBuf)// 发送挑战消息challengeMsg := fmt.Sprintf("CHALLENGE sha256 %d %s\r\n", ChallengeLeadingZeros, challenge)conn.Write([]byte(challengeMsg))fmt.Printf("%s: sent challenge: %s\n", clientIP, challenge)// 4. 验证POW解决方案(设置超时)conn.SetReadDeadline(time.Now().Add(time.Duration(ChallengeTimeoutMs) * time.Millisecond))n, err := conn.Read(buf)if err != nil {if errors.Is(err, os.ErrDeadlineExceeded) {conn.Write([]byte("TOO SLOW\r\n"))fmt.Printf("%s: POW timeout\n", clientIP)} else {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: read POW solution failed: %v\n", clientIP, err)}return}solution := strings.TrimSpace(string(buf[:n]))if !strings.HasPrefix(solution, "ACCEPTED ") {conn.Write([]byte("INVALID COMMAND\r\n"))fmt.Printf("%s: invalid POW response\n", clientIP)return}prefix := strings.TrimPrefix(solution, "ACCEPTED ")// 5. 验证POW哈希powData := fmt.Sprintf("%s\r\n%s%s\r\n", prefix, string(content), challenge)hash := sha256.Sum256([]byte(powData))hashHex := hex.EncodeToString(hash[:])if countLeadingZeros(hashHex) < ChallengeLeadingZeros {conn.Write([]byte("CHALLENGE FAILED\r\n"))fmt.Printf("%s: POW failed (hash: %s)\n", clientIP, hashHex)return}fmt.Printf("%s: POW success (hash: %s)\n", clientIP, hashHex)// 6. 保存文件postID := generatePostID()postPath := filepath.Join(DefaultPostsRoot, postID)// 确保posts目录存在if err := os.MkdirAll(DefaultPostsRoot, 0755); err != nil {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: create posts dir failed: %v\n", clientIP, err)return}// 写入文件if err := os.WriteFile(postPath, content, 0644); err != nil {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: write post failed: %v\n", clientIP, err)return}// 7. 返回Post IDconn.Write([]byte(fmt.Sprintf("SENT %s\r\n", postID)))fmt.Printf("%s: post saved, ID: %s\n", clientIP, postID)
}// 处理GET请求逻辑
func handleGet(conn net.Conn, clientIP, id string) {// 验证ID合法性if !isValidPostID(id) {conn.Write([]byte("404\r\n"))fmt.Printf("%s: invalid post ID: %s\n", clientIP, id)return}// 读取文件postPath := filepath.Join(DefaultPostsRoot, id)content, err := os.ReadFile(postPath)if err != nil {if errors.Is(err, os.ErrNotExist) {conn.Write([]byte("404\r\n"))fmt.Printf("%s: post not found: %s\n", clientIP, id)} else {conn.Write([]byte("500\r\n"))fmt.Printf("%s: read post failed: %v\n", clientIP, err)}return}// 发送文件内容conn.Write(content)fmt.Printf("%s: sent post: %s (size: %d bytes)\n", clientIP, id, len(content))
}// -------------------------- 主函数 --------------------------
func main() {// 加载TLS证书(需提前生成server.crt和server.key)cert, err := tls.LoadX509KeyPair("server.crt", "server.key")if err != nil {fmt.Printf("load TLS cert failed: %v\n", err)os.Exit(1)}tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert},MinVersion:   tls.VersionTLS12, // 禁用旧协议}// 启动TLS监听listener, err := tls.Listen("tcp", fmt.Sprintf(":%d", DefaultPort), tlsConfig)if err != nil {fmt.Printf("start TLS listener failed: %v\n", err)os.Exit(1)}defer listener.Close()fmt.Printf("server started: TLS :%d, posts root: %s\n", DefaultPort, DefaultPostsRoot)// 接受连接(循环)for {conn, err := listener.Accept()if err != nil {fmt.Printf("accept connection failed: %v\n", err)continue}// 启动协程处理会话go handleSession(conn)}
}

python client script

get.py

#!/usr/bin/env python3import socket
import ssl
import sys# 配置常量
RECV_SIZE = 4096
TIMEOUT = 15  # 延长超时时间到15秒def check_response(client, expected: bytes) -> None:"""验证服务器响应是否完全符合预期"""client.settimeout(TIMEOUT)try:actual = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError(f"服务器在{TIMEOUT}秒内未响应")if not actual:raise ConnectionError("服务器意外关闭了连接")if actual != expected:raise ProtocolError(f"响应不匹配: 收到 {actual!r},预期 {expected!r}")def read_response_prefix(client, prefix: bytes) -> bytes:"""读取服务器响应,验证前缀并返回剩余部分"""client.settimeout(TIMEOUT)try:response = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError(f"服务器在{TIMEOUT}秒内未响应")if not response:raise ConnectionError("服务器意外关闭了连接")if not response.startswith(prefix):raise ProtocolError(f"响应前缀不匹配: 收到 {response!r},预期前缀 {prefix!r}")return response[len(prefix):]def usage(program_name: str) -> None:print(f"Usage: {program_name} <host> <port> <post-id>")print("示例: python get_paste.py localhost 6969 f576933171d2eb19f43b72a5dfb99e7ce67941fe09d9c75cb09c9db556ec6efa")def main() -> None:args = sys.argv[1:]if len(args) != 3:usage(sys.argv[0])sys.exit(1)host, port_str, post_id = argstry:port = int(port_str)except ValueError:print(f"错误: 无效的端口 '{port_str}' (必须是数字)")sys.exit(1)# TLS连接配置context = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEtry:with socket.create_connection((host, port), timeout=TIMEOUT) as sock:with context.wrap_socket(sock, server_hostname=host) as client:print(f"✅ 已连接到Pastebeam服务器: {host}:{port}")# 验证欢迎消息check_response(client, b"HI\r\n")# 发送GET命令client.sendall(b"GET\r\n")print(f"📤 已发送命令: GET\r\n")# 等待服务器确认check_response(client, b"OK\r\n")print(f"✅ 服务器接受GET请求,准备发送Post ID")# 发送Post IDclient.sendall(f"{post_id}\r\n".encode("utf-8"))print(f"📤 已发送Post ID: {post_id}")print("------------------------------")# 接收内容client.settimeout(TIMEOUT)content = []while True:data = client.recv(RECV_SIZE)if not data:breakcontent.append(data.decode("utf-8", errors="replace"))  # 容错解码full_content = "".join(content)if full_content == "404\r\n":print(f"❌ 错误: Post ID '{post_id}' 不存在 (404)")elif full_content.startswith("500\r\n"):print(f"❌ 服务器错误: {full_content.strip()}")elif full_content.startswith("TIMEOUT\r\n"):print(f"❌ 错误: 操作超时")elif not full_content:print(f"❌ 错误: 未收到内容")else:print(f"✅ 成功接收内容 (长度: {len(full_content)} 字节):")print("------------------------------")print(full_content)print("------------------------------")print("操作完成")except ConnectionRefusedError:print(f"错误: 无法连接到 {host}:{port} (服务器可能未运行)")sys.exit(1)except TimeoutError as e:print(f"错误: {e}")sys.exit(1)except ConnectionError as e:print(f"错误: {e}")sys.exit(1)except ProtocolError as e:print(f"错误: 协议不匹配: {e}")sys.exit(1)except Exception as e:print(f"错误: 发生意外错误: {str(e)}")sys.exit(1)class ProtocolError(Exception):passif __name__ == "__main__":main()

post.py

#!/usr/bin/env python3import hashlib
from random import randint, randbytes
from base64 import b64encode
import socket
import ssl
import sys
import os
import mathRECV_SIZE = 1024
POW_LIMIT = 100_000_000  # POW最大尝试次数def check_response(client, expected: bytes) -> None:"""验证服务器响应是否符合预期"""client.settimeout(10)try:actual = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError("server response timeout")assert expected == actual, f"server returned {actual!r} (expected {expected!r})"def check_response_prefix(client, prefix: bytes) -> bytes:"""验证服务器响应前缀,并返回前缀后的内容"""client.settimeout(10)try:response = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError("server response timeout")assert response.startswith(prefix), f"server returned {response!r} (expected prefix {prefix!r})"return response[len(prefix):].strip()def usage(program_name: str) -> None:"""打印使用说明"""print(f"Usage: {program_name} <host> <port> <file-path>")def main() -> None:args = sys.argv[1:]if len(args) != 3:usage(sys.argv[0])sys.exit(1)host, port_str, file_path = argsport = int(port_str)# 检查文件是否存在if not os.path.isfile(file_path):print(f"ERROR: file {file_path!r} does not exist")sys.exit(1)# 读取文件内容(自动处理所有换行符)with open(file_path, 'r', encoding='utf-8') as f:content = f.read().splitlines()  # 统一分割各种换行符if not content:print("ERROR: file is empty")sys.exit(1)# TLS连接配置context = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONE  # 跳过自签名证书验证# 建立连接并上传try:with socket.create_connection((host, port)) as sock:with context.wrap_socket(sock, server_hostname=host) as client:# 1. 验证欢迎消息check_response(client, b"HI\r\n")print(f"✅ Connected to server: {host}:{port}")# 2. 发送POST命令client.sendall(b"POST\r\n")check_response(client, b"OK\r\n")print(f"✅ Server accepts POST request")# 3. 上传文件内容(强制\r\n结尾)total_lines = len(content)bar_len = 30print(f"📤 Uploading {total_lines} lines...")for idx, line in enumerate(content):# 强制添加\r\n结尾,确保符合服务器要求client.sendall(f"{line}\r\n".encode('utf-8'))check_response(client, b"OK\r\n")# 显示进度条progress = (idx + 1) / total_linesfilled = math.floor(progress * bar_len)bar = '#' * filled + '.' * (bar_len - filled)print(f"\r[{bar}] {idx+1}/{total_lines}", end='', flush=True)print(f"\r[{ '#' * bar_len }] {total_lines}/{total_lines} ✅")# 4. 发送提交命令并获取挑战client.sendall(b"SUBMIT\r\n")challenge_data = check_response_prefix(client, b"CHALLENGE ").split()assert len(challenge_data) == 3, f"invalid challenge format: {challenge_data}"hash_func, zeros_str, challenge = challenge_dataassert hash_func == b"sha256", f"unsupported hash: {hash_func!r}"leading_zeros = int(zeros_str.decode('utf-8'))challenge_str = challenge.decode('utf-8')print(f"🔒 POW challenge: {leading_zeros} leading zeros")# 5. 破解POW挑战print(f"🔨 Mining solution (max {POW_LIMIT} attempts)...")counter = 0spinner = "-\\|/"found = Falsewhile counter < POW_LIMIT:# 显示加载动画if counter % 10000 == 0:spin_char = spinner[(counter // 10000) % len(spinner)]print(f"\rTrying... {spin_char} (attempt {counter})", end='', flush=True)# 生成随机前缀prefix_bytes = randbytes(randint(3, 100))prefix = b64encode(prefix_bytes).decode('utf-8')# 拼接POW数据pow_parts = [prefix] + content + [challenge_str, ""]pow_data = "\r\n".join(pow_parts)# 计算哈希hash_obj = hashlib.sha256(pow_data.encode('utf-8'))hash_hex = hash_obj.hexdigest()# 检查前导零zero_count = 0while zero_count < len(hash_hex) and hash_hex[zero_count] == '0':zero_count += 1if zero_count >= leading_zeros:print(f"\rFound solution after {counter + 1} attempts ✅")# 发送解决方案client.sendall(f"ACCEPTED {prefix}\r\n".encode('utf-8'))# 获取Post IDpost_id = check_response_prefix(client, b"SENT ").decode('utf-8')print(f"\n🎉 Upload success! Post ID: {post_id}")found = Truebreakcounter += 1if not found:print(f"\n❌ Failed to solve POW (exceeded {POW_LIMIT} attempts)")sys.exit(1)except ConnectionRefusedError:print(f"ERROR: Could not connect to {host}:{port} (server not running?)")sys.exit(1)except Exception as e:print(f"ERROR: {e}")sys.exit(1)if __name__ == "__main__":main()

测试

  • 自己run一下generate_cert.go生成证书,启动pastebeam server
  • 然后用python脚本post文件,用get脚本获取id以取得文件内容

ref link

  • https://github.com/tsoding/pastebeam
http://www.dtcms.com/a/338274.html

相关文章:

  • 芋道审批流配置流程表单超详细介绍
  • 15.web api 6
  • Unity 中控开发 多路串口服务器(一)
  • 【Goland】:数组与切片
  • 【25-cv-09352】Maradona 品牌维权,从球衣到周边全品类侵权高危
  • Jupyter 中实现交互式图表:ipywidgets 从入门到部署
  • 【数据集】全球大气监测计划(GAW)简介
  • 用户认证技术与HTTP协议
  • 基于pychrm工具的python读取 USB 摄像头(实时+保存录像+摄像头信息打印+镜像)—— OpenCV库
  • 【React Hooks】封装的艺术:如何编写高质量的 React 自-定义 Hooks
  • 【高等数学】第九章 多元函数微分法及其应用——第七节 方向导数与梯度
  • Localhost和127.0.0.1
  • 数据库原理及应用_数据库基础_第2章关系数据库标准语言SQL_数据类型表操作(定义、操作和修改)
  • 终极方案!lightRag/graphRag离线使用tiktoken持续报错SSLError,不改源码,彻底解决!
  • MySQL和HiveSQL在查询上的区别
  • 上网行为管理
  • 用户认证与应用控制技术
  • 深入浅出 SQL:数据库操作的核心语言完全指南
  • 【c++】从灵活到规范:自定义消息机制的设计与实践
  • day10(练习题)
  • Three.js 动画循环学习记录
  • 6 webUI中图生图重绘方式--涂鸦、涂鸦重绘、局部重绘、上传蒙版重绘
  • 生成式引擎优化(GEO)AI搜索优化专家竞争力报告
  • 检测手绘图中不规则曲线交点的方法和一般规则线条交点的方法
  • rom定制系列------小米cc9机型 原生安卓15系统 双版线刷root 定制修改功能项
  • 力扣(分发糖果)
  • 【完整源码+数据集+部署教程】海洋垃圾与生物识别系统源码和数据集:改进yolo11-RVB
  • 深度优先遍历dfs(模板)
  • VS Code Copilot 完整使用教程(含图解)
  • 【笔记ing】考试脑科学 脑科学中的高效记忆法