Python实例题:基于区块链的去中心化应用平台(区块链、智能合约)
目录
Python实例题
题目
问题描述
解题思路
关键代码框架
难点分析
扩展方向
Python实例题
题目
基于区块链的去中心化应用平台(区块链、智能合约)
问题描述
开发一个基于区块链的去中心化应用平台,包含以下功能:
- 区块链基础:实现区块链的数据结构和基本操作
- 共识机制:实现工作量证明 (PoW) 或权益证明 (PoS) 共识
- 智能合约:设计并实现简单的智能合约语言和执行环境
- 去中心化应用:构建基于智能合约的去中心化应用
- 网络通信:实现节点间的 P2P 通信协议
解题思路
- 设计区块链的数据结构和区块格式
- 实现共识算法保证区块链的一致性和安全性
- 开发智能合约虚拟机和编程语言
- 设计 P2P 网络协议实现节点间的通信和同步
- 构建用户界面和 API 供开发者使用
关键代码框架
# 区块链基础实现
import hashlib
import json
import time
from typing import List, Dict, Any, Optional
import requestsclass Block:def __init__(self, index: int, transactions: List[Dict[str, Any]], timestamp: float, previous_hash: str, nonce: int = 0):"""初始化区块参数:index: 区块索引transactions: 交易列表timestamp: 时间戳previous_hash: 前一个区块的哈希值nonce: 用于工作量证明的随机数"""self.index = indexself.transactions = transactionsself.timestamp = timestampself.previous_hash = previous_hashself.nonce = nonceself.hash = self.calculate_hash()def calculate_hash(self) -> str:"""计算区块的哈希值"""block_string = json.dumps({"index": self.index,"transactions": self.transactions,"timestamp": self.timestamp,"previous_hash": self.previous_hash,"nonce": self.nonce}, sort_keys=True).encode()return hashlib.sha256(block_string).hexdigest()def to_dict(self) -> Dict[str, Any]:"""将区块转换为字典形式"""return {"index": self.index,"transactions": self.transactions,"timestamp": self.timestamp,"previous_hash": self.previous_hash,"nonce": self.nonce,"hash": self.hash}class Blockchain:def __init__(self):"""初始化区块链"""self.chain: List[Block] = []self.pending_transactions: List[Dict[str, Any]] = []self.nodes: set = set()# 创建创世区块self.create_genesis_block()def create_genesis_block(self) -> Block:"""创建创世区块"""genesis_block = Block(0, [], time.time(), "0")self.chain.append(genesis_block)return genesis_blockdef get_latest_block(self) -> Block:"""获取最新区块"""return self.chain[-1]def proof_of_work(self, block: Block) -> int:"""工作量证明算法参数:block: 需要进行工作量证明的区块返回:有效的nonce值"""difficulty = 4 # 难度级别,前导0的数量prefix = "0" * difficultyblock.nonce = 0computed_hash = block.calculate_hash()while not computed_hash.startswith(prefix):block.nonce += 1computed_hash = block.calculate_hash()return block.noncedef is_chain_valid(self, chain: List[Block]) -> bool:"""验证区块链的有效性参数:chain: 需要验证的区块链返回:区块链是否有效"""previous_block = chain[0]current_index = 1while current_index < len(chain):block = chain[current_index]# 验证哈希值if block.hash != block.calculate_hash():return False# 验证前一个区块的哈希值if block.previous_hash != previous_block.hash:return False# 验证工作量证明difficulty = 4prefix = "0" * difficultyif not block.hash.startswith(prefix):return Falseprevious_block = blockcurrent_index += 1return Truedef add_block(self, block: Block, proof: int) -> bool:"""添加新区块到区块链参数:block: 要添加的区块proof: 工作量证明的结果返回:是否成功添加区块"""previous_hash = self.get_latest_block().hash# 验证前一个区块的哈希值if previous_hash != block.previous_hash:return False# 验证工作量证明difficulty = 4prefix = "0" * difficultyif not block.hash.startswith(prefix) or block.hash != block.calculate_hash():return Falseself.chain.append(block)return Truedef add_transaction(self, sender: str, recipient: str, amount: float, contract_code: Optional[str] = None) -> int:"""添加新交易到待处理交易池参数:sender: 发送方地址recipient: 接收方地址amount: 交易金额contract_code: 智能合约代码(如果是合约部署交易)返回:交易将被包含的区块索引"""self.pending_transactions.append({"sender": sender,"recipient": recipient,"amount": amount,"timestamp": time.time(),"contract_code": contract_code})return self.get_latest_block().index + 1def mine_block(self, miner_address: str) -> Block:"""挖矿创建新区块参数:miner_address: 矿工地址,用于接收挖矿奖励返回:新创建的区块"""# 添加挖矿奖励交易self.add_transaction(sender="0", # 表示系统奖励recipient=miner_address,amount=10 # 挖矿奖励)# 创建新区块previous_block = self.get_latest_block()new_block = Block(index=previous_block.index + 1,transactions=self.pending_transactions,timestamp=time.time(),previous_hash=previous_block.hash)# 执行工作量证明proof = self.proof_of_work(new_block)# 添加新区块到区块链if self.add_block(new_block, proof):# 清空待处理交易池self.pending_transactions = []return new_blockelse:return Nonedef register_node(self, address: str) -> None:"""注册新节点参数:address: 节点地址"""self.nodes.add(address)def resolve_conflicts(self) -> bool:"""共识算法:解决冲突,使用最长链规则返回:区块链是否被更新"""neighbours = self.nodesnew_chain = Nonemax_length = len(self.chain)# 获取所有邻居节点的区块链for node in neighbours:try:response = requests.get(f"http://{node}/chain")if response.status_code == 200:length = response.json()["length"]chain = response.json()["chain"]# 验证链的有效性并检查是否更长if length > max_length and self.is_chain_valid([Block(**block_data) for block_data in chain]):max_length = lengthnew_chain = chainexcept requests.exceptions.RequestException:continue# 如果找到更长的有效链,则替换当前链if new_chain:self.chain = [Block(**block_data) for block_data in new_chain]return Truereturn False
# 智能合约实现
class Contract:def __init__(self, code: str, owner: str, contract_id: str):"""初始化智能合约参数:code: 合约代码owner: 合约所有者contract_id: 合约ID"""self.code = codeself.owner = ownerself.contract_id = contract_idself.state = {} # 合约状态self.balance = 0 # 合约余额def execute(self, function_name: str, params: Dict[str, Any], caller: str) -> Dict[str, Any]:"""执行智能合约函数参数:function_name: 函数名params: 函数参数caller: 调用者地址返回:执行结果"""# 简单的合约执行环境# 实际应用中需要更复杂的虚拟机和安全机制try:# 这里简化处理,实际应执行合约代码if function_name == "get_balance":return {"success": True, "result": self.balance}elif function_name == "transfer":amount = params.get("amount", 0)recipient = params.get("recipient", "")if amount <= 0 or not recipient:return {"success": False, "error": "Invalid parameters"}if amount > self.balance:return {"success": False, "error": "Insufficient balance"}self.balance -= amount# 这里应该触发一个交易到接收方return {"success": True, "message": f"Transferred {amount} to {recipient}"}elif function_name == "deploy":# 部署新合约的逻辑passelse:return {"success": False, "error": f"Function {function_name} not found"}except Exception as e:return {"success": False, "error": str(e)}class ContractManager:def __init__(self, blockchain: Blockchain):"""初始化智能合约管理器参数:blockchain: 关联的区块链"""self.blockchain = blockchainself.contracts: Dict[str, Contract] = {}def deploy_contract(self, code: str, owner: str) -> str:"""部署新智能合约参数:code: 合约代码owner: 合约所有者返回:合约ID"""# 生成合约IDcontract_id = hashlib.sha256(f"{code}{owner}{time.time()}".encode()).hexdigest()[:16]# 创建合约实例contract = Contract(code, owner, contract_id)# 添加到合约管理器self.contracts[contract_id] = contract# 添加部署交易到区块链self.blockchain.add_transaction(sender=owner,recipient="0", # 表示系统合约amount=0,contract_code=code)return contract_iddef execute_contract(self, contract_id: str, function_name: str, params: Dict[str, Any], caller: str) -> Dict[str, Any]:"""执行智能合约函数参数:contract_id: 合约IDfunction_name: 函数名params: 函数参数caller: 调用者地址返回:执行结果"""if contract_id not in self.contracts:return {"success": False, "error": f"Contract {contract_id} not found"}contract = self.contracts[contract_id]# 添加交易到区块链tx_index = self.blockchain.add_transaction(sender=caller,recipient=contract_id,amount=0, # 无代币转移,只是调用合约contract_code=None)# 执行合约函数result = contract.execute(function_name, params, caller)# 如果执行成功,记录到区块链if result["success"]:# 这里可以添加更多的执行结果记录passreturn result
# P2P网络节点
import socket
import threading
import jsonclass P2PNode:def __init__(self, host: str, port: int, blockchain: Blockchain):"""初始化P2P网络节点参数:host: 主机地址port: 端口号blockchain: 关联的区块链"""self.host = hostself.port = portself.blockchain = blockchainself.peers = set() # 存储连接的对等节点self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server.bind((self.host, self.port))self.server.listen(10)self.running = False# 注册自身节点到区块链self.blockchain.register_node(f"{self.host}:{self.port}")def start(self) -> None:"""启动节点"""self.running = Trueprint(f"Node listening on {self.host}:{self.port}")# 启动接收线程receive_thread = threading.Thread(target=self._receive_connections)receive_thread.daemon = Truereceive_thread.start()def stop(self) -> None:"""停止节点"""self.running = Falseself.server.close()def connect_to_peer(self, peer_address: str) -> None:"""连接到其他对等节点参数:peer_address: 对等节点地址"""if peer_address == f"{self.host}:{self.port}":returnif peer_address not in self.peers:try:peer_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)host, port = peer_address.split(':')peer_socket.connect((host, int(port)))# 发送握手消息self._send_message(peer_socket, {"type": "handshake","data": {"address": f"{self.host}:{self.port}","chain_length": len(self.blockchain.chain)}})# 添加到对等节点列表self.peers.add(peer_address)# 注册到区块链self.blockchain.register_node(peer_address)# 启动消息处理线程thread = threading.Thread(target=self._handle_peer, args=(peer_socket,))thread.daemon = Truethread.start()print(f"Connected to peer: {peer_address}")except Exception as e:print(f"Failed to connect to peer {peer_address}: {e}")def broadcast(self, message: Dict[str, Any]) -> None:"""广播消息到所有连接的对等节点参数:message: 要广播的消息"""for peer in list(self.peers):try:host, port = peer.split(':')peer_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)peer_socket.connect((host, int(port)))self._send_message(peer_socket, message)peer_socket.close()except Exception as e:print(f"Failed to send message to peer {peer}: {e}")self.peers.discard(peer)def _receive_connections(self) -> None:"""接收新的连接"""while self.running:try:client_socket, client_address = self.server.accept()print(f"New connection from {client_address}")# 启动消息处理线程thread = threading.Thread(target=self._handle_peer, args=(client_socket,))thread.daemon = Truethread.start()except Exception as e:if self.running:print(f"Error receiving connection: {e}")def _handle_peer(self, peer_socket: socket.socket) -> None:"""处理来自对等节点的消息参数:peer_socket: 对等节点的套接字"""while self.running:try:message = self._receive_message(peer_socket)if not message:breakself._process_message(message, peer_socket)except Exception as e:print(f"Error handling peer: {e}")break# 关闭连接peer_socket.close()def _send_message(self, socket: socket.socket, message: Dict[str, Any]) -> None:"""发送消息到指定套接字参数:socket: 目标套接字message: 要发送的消息"""socket.send(json.dumps(message).encode() + b'\n')def _receive_message(self, socket: socket.socket) -> Dict[str, Any]:"""从指定套接字接收消息参数:socket: 源套接字返回:接收到的消息"""data = b''while True:chunk = socket.recv(1024)if not chunk:return Nonedata += chunkif b'\n' in data:messages = data.split(b'\n')for msg in messages[:-1]:try:return json.loads(msg.decode())except json.JSONDecodeError:continuedata = messages[-1]def _process_message(self, message: Dict[str, Any], peer_socket: socket.socket) -> None:"""处理接收到的消息参数:message: 接收到的消息peer_socket: 发送消息的套接字"""message_type = message.get("type")data = message.get("data")if message_type == "handshake":# 处理握手消息peer_address = data.get("address")peer_chain_length = data.get("chain_length")if peer_address and peer_address != f"{self.host}:{self.port}":self.peers.add(peer_address)self.blockchain.register_node(peer_address)# 检查谁的链更长,决定是否需要同步if peer_chain_length > len(self.blockchain.chain):# 请求对方的完整区块链self._send_message(peer_socket, {"type": "request_chain","data": {}})elif message_type == "new_block":# 处理新区块通知block_data = data.get("block")if block_data:new_block = Block(index=block_data["index"],transactions=block_data["transactions"],timestamp=block_data["timestamp"],previous_hash=block_data["previous_hash"],nonce=block_data["nonce"])new_block.hash = block_data["hash"]# 添加新区块self.blockchain.add_block(new_block, new_block.nonce)# 广播新区块给其他节点self.broadcast({"type": "new_block","data": {"block": new_block.to_dict()}})elif message_type == "new_transaction":# 处理新交易通知transaction = data.get("transaction")if transaction:self.blockchain.add_transaction(sender=transaction["sender"],recipient=transaction["recipient"],amount=transaction["amount"],contract_code=transaction.get("contract_code"))# 广播新交易给其他节点self.broadcast({"type": "new_transaction","data": {"transaction": transaction}})elif message_type == "request_chain":# 处理区块链请求self._send_message(peer_socket, {"type": "chain_response","data": {"chain": [block.to_dict() for block in self.blockchain.chain],"length": len(self.blockchain.chain)}})elif message_type == "chain_response":# 处理区块链响应chain_data = data.get("chain")chain_length = data.get("length")if chain_length > len(self.blockchain.chain):# 验证并替换当前链new_chain = [Block(**block_data) for block_data in chain_data]if self.blockchain.is_chain_valid(new_chain):self.blockchain.chain = new_chainprint("Blockchain updated from peer")
# 去中心化应用示例
class DApp:def __init__(self, node: P2PNode, contract_manager: ContractManager, wallet_address: str):"""初始化去中心化应用参数:node: P2P网络节点contract_manager: 智能合约管理器wallet_address: 钱包地址"""self.node = nodeself.contract_manager = contract_managerself.wallet_address = wallet_addressdef deploy_token_contract(self, name: str, symbol: str, total_supply: int) -> str:"""部署代币合约参数:name: 代币名称symbol: 代币符号total_supply: 总供应量返回:合约ID"""# 代币合约代码contract_code = f"""// 简单的ERC20风格代币合约contract {name} {{string public name = "{name}";string public symbol = "{symbol}";uint256 public totalSupply = {total_supply};mapping(address => uint256) public balanceOf;constructor() {{balanceOf[msg.sender] = totalSupply;}}function transfer(address to, uint256 value) public returns (bool) {{require(balanceOf[msg.sender] >= value, "Insufficient balance");balanceOf[msg.sender] -= value;balanceOf[to] += value;return true;}}function balanceOf(address account) public view returns (uint256) {{return balanceOf[account];}}}}"""# 部署合约contract_id = self.contract_manager.deploy_contract(contract_code, self.wallet_address)print(f"代币合约已部署,合约ID: {contract_id}")return contract_iddef transfer_tokens(self, contract_id: str, recipient: str, amount: int) -> Dict[str, Any]:"""转移代币参数:contract_id: 合约IDrecipient: 接收方地址amount: 转移数量返回:交易结果"""# 执行合约函数result = self.contract_manager.execute_contract(contract_id=contract_id,function_name="transfer",params={"to": recipient, "value": amount},caller=self.wallet_address)return resultdef get_balance(self, contract_id: str, address: str) -> int:"""获取代币余额参数:contract_id: 合约IDaddress: 查询地址返回:余额"""# 执行合约函数result = self.contract_manager.execute_contract(contract_id=contract_id,function_name="balanceOf",params={"account": address},caller=self.wallet_address)if result["success"]:return result["result"]else:return 0# 主程序示例
def main():# 创建区块链blockchain = Blockchain()# 创建智能合约管理器contract_manager = ContractManager(blockchain)# 创建P2P节点node = P2PNode(host="localhost", port=5000, blockchain=blockchain)node.start()# 连接到其他节点(如果有)# node.connect_to_peer("localhost:5001")# 创建钱包地址wallet1 = hashlib.sha256("user1".encode()).hexdigest()wallet2 = hashlib.sha256("user2".encode()).hexdigest()# 创建去中心化应用dapp = DApp(node, contract_manager, wallet1)# 部署代币合约token_contract_id = dapp.deploy_token_contract("MyToken", "MTK", 1000000)# 挖矿以确认合约部署print("Mining to confirm contract deployment...")block = blockchain.mine_block(wallet1)print(f"Block mined: {block.index}")# 转移代币print(f"Transferring tokens from {wallet1} to {wallet2}")result = dapp.transfer_tokens(token_contract_id, wallet2, 1000)print(f"Transfer result: {result}")# 挖矿以确认交易print("Mining to confirm transaction...")block = blockchain.mine_block(wallet1)print(f"Block mined: {block.index}")# 查询余额balance1 = dapp.get_balance(token_contract_id, wallet1)balance2 = dapp.get_balance(token_contract_id, wallet2)print(f"{wallet1} balance: {balance1}")print(f"{wallet2} balance: {balance2}")# 显示区块链信息print(f"Blockchain length: {len(blockchain.chain)}")for block in blockchain.chain:print(f"Block {block.index}: {block.hash[:10]}...")for tx in block.transactions:print(f" Transaction: {tx['sender'][:10]}... -> {tx['recipient'][:10]}... ({tx['amount']})")# 停止节点node.stop()if __name__ == "__main__":main()
难点分析
- 共识机制实现:设计和实现安全高效的共识算法
- 智能合约安全:确保智能合约代码的安全性,防止漏洞
- 网络同步:在分布式环境中保持区块链的一致性
- 性能优化:提高区块链的吞吐量和降低延迟
- 用户体验:简化复杂的区块链概念,提供友好的用户界面
扩展方向
- 实现更复杂的智能合约功能
- 添加代币和经济模型
- 开发分布式应用生态系统
- 研究隐私保护技术
- 探索与其他区块链的互操作性