Python So Easy 大虫小呓三部曲 - 高阶篇
你好,我是大虫,很高兴你能坚持学习到这里!
由于之前写的一篇《【全网最全】50个Python处理Excel示例代码,覆盖95%日常使用场景》 大家反馈不错,于是就筹划了本系列教程。
能够来到高阶篇,说明你已经有了相当不错的Python基础。在这里,我们将探索Python的一些高级特性和实际应用。
我们会学习如何编写高性能的代码,如何进行网络编程,如何操作数据库,以及如何开发Web应用等。
这些知识将帮助你完成从编程学习者到实际开发者的转变。
希望你能够享受这段学习旅程,不断地挑战自己,创造出令人惊叹的程序!记住,优秀的程序员不是天生的,而是练出来的!
目录
- 高级函数特性(闭包、匿名函数、偏函数等)
- 元编程(元类、属性访问控制等)
- 并发编程深入(asyncio)
- 网络编程
- 数据库操作
- Web开发基础(Flask/FastAPI)
- 数据分析基础(Pandas/Numpy)
- 性能优化
- 项目部署
- 实战项目:简易Web应用
1. 高级函数特性(闭包、匿名函数、偏函数等)
闭包
# 闭包:内层函数引用外层函数的变量
def outer_function(x):def inner_function(y):return x + y # inner_function引用了outer_function的变量xreturn inner_function# 创建闭包
add_5 = outer_function(5)
print(add_5(3)) # 输出: 8# 闭包的实际应用:创建专门的函数
def make_multiplier(n):def multiplier(x):return x * nreturn multiplierdouble = make_multiplier(2)
triple = make_multiplier(3)print(double(5)) # 10
print(triple(5)) # 15
匿名函数(lambda)
# 匿名函数(lambda)
# 简洁地定义简单函数
square = lambda x: x ** 2
print(square(4)) # 16# 在高阶函数中使用lambda
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, numbers))
print(squared) # [1, 4, 9, 16, 25]# 过滤偶数
evens = list(filter(lambda x: x % 2 == 0, numbers))
print(evens) # [2, 4]# 排序复杂对象
students = [("大虫", 85), ("小虫", 90), ("小明", 78)]
# 按分数排序
sorted_students = sorted(students, key=lambda x: x[1])
print(sorted_students) # [('小明', 78), ('大虫', 85), ('小虫', 90)]
偏函数
# 偏函数
from functools import partialdef power(base, exponent):return base ** exponent# 创建平方函数和立方函数
square = partial(power, exponent=2)
cube = partial(power, exponent=3)print(square(4)) # 16
print(cube(3)) # 27# 偏函数在实际中的应用
int_base2 = partial(int, base=2) # 二进制转十进制
print(int_base2("1010")) # 10
函数属性和注解
# 函数属性和注解
def greet(name: str, age: int) -> str:"""问候函数"""return f"你好 {name},你今年 {age} 岁"print(greet.__name__) # 函数名
print(greet.__doc__) # 文档字符串
print(greet.__annotations__) # 参数和返回值注解
2. 元编程(元类、属性访问控制等)
元编程是指编写能够操作程序本身的代码,例如创建或修改类和函数。虽然这是一个高级主题,但了解它可以让你更深入地理解Python的工作原理。
元编程就像是"代码的代码",让你拥有改变程序结构的超能力。(元类、属性访问控制等)
元编程是指编写能够操作程序本身的代码,例如创建或修改类和函数。
属性访问控制
# 属性访问控制
class Person:def __init__(self, name):self._name = name# 控制属性访问def __getattr__(self, name):return f"属性 {name} 不存在"# 控制属性设置def __setattr__(self, name, value):if name.startswith('_'):super().__setattr__(name, value)else:print(f"设置属性 {name} 为 {value}")super().__setattr__(name, value)# 控制属性删除def __delattr__(self, name):if name.startswith('_'):super().__delattr__(name)else:print(f"删除属性 {name}")super().__delattr__(name)person = Person("大虫")
print(person.name) # 触发 __getattr__
person.age = 25 # 触发 __setattr__
del person.age # 触发 __delattr__
描述符
# 描述符
class PositiveNumber:def __init__(self):self.value = 0def __get__(self, obj, objtype=None):return self.valuedef __set__(self, obj, value):if not isinstance(value, (int, float)) or value <= 0:raise ValueError("值必须是正数")self.value = valuedef __delete__(self, obj):self.value = 0class Product:price = PositiveNumber()product = Product()
product.price = 100
print(product.price) # 100
# product.price = -10 # 会抛出异常
使用@property装饰器
# 使用@property装饰器
class Circle:def __init__(self, radius):self._radius = radius@propertydef radius(self):return self._radius@radius.setterdef radius(self, value):if value <= 0:raise ValueError("半径必须是正数")self._radius = value@propertydef area(self):return 3.14159 * self._radius ** 2circle = Circle(5)
print(circle.area) # 78.53975
circle.radius = 10
print(circle.area) # 314.159
元类示例
# 元类示例
class MetaSingleton(type):_instances = {}def __call__(cls, *args, **kwargs):if cls not in cls._instances:cls._instances[cls] = super().__call__(*args, **kwargs)return cls._instances[cls]class Database(metaclass=MetaSingleton):def __init__(self):self.connection = "数据库连接"# 测试单例模式
db1 = Database()
db2 = Database()
print(db1 is db2) # True,两个实例是同一个对象
3. 并发编程深入(asyncio)
异步编程用于处理I/O密集型任务,可以显著提高程序的性能。
需要安装aiohttp:
pip install aiohttp
异步函数
# asyncio用于异步编程,处理I/O密集型任务import asyncio
import aiohttp
import time# 异步函数
async def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"开始时间: {time.strftime('%X')}")# 顺序执行await say_after(1, 'hello')await say_after(2, 'world')print(f"结束时间: {time.strftime('%X')}")# 运行异步函数
# asyncio.run(main())
并发执行
# 并发执行
async def main_concurrent():print(f"开始时间: {time.strftime('%X')}")# 并发执行await asyncio.gather(say_after(1, 'hello'),say_after(2, 'world'))print(f"结束时间: {time.strftime('%X')}")# asyncio.run(main_concurrent())
异步HTTP请求示例
# 异步HTTP请求示例
async def fetch_url(session, url):async with session.get(url) as response:return await response.text()async def fetch_multiple_urls():urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/delay/3']async with aiohttp.ClientSession() as session:# 并发请求多个URLtasks = [fetch_url(session, url) for url in urls]results = await asyncio.gather(*tasks)return results# 运行HTTP请求示例(解开下面两行注释运行代码)
# results = asyncio.run(fetch_multiple_urls())
# print(f"获取到 {len(results)} 个响应")
异步生成器和上下文管理器
# 异步生成器
async def async_range(start, stop):for i in range(start, stop):await asyncio.sleep(0.1) # 模拟异步操作yield iasync def main_async_gen():async for i in async_range(0, 5):print(i)# asyncio.run(main_async_gen())# 异步上下文管理器
class AsyncContextManager:async def __aenter__(self):print("进入异步上下文")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):print("退出异步上下文")async def use_async_context():async with AsyncContextManager() as acm:print("在异步上下文中")# asyncio.run(use_async_context())
4. 网络编程
网络编程是构建分布式应用的基础,Python提供了强大的网络编程支持。
TCP服务器示例
import socket
import threadingdef handle_client(client_socket, address):print(f"连接来自: {address}")try:while True:# 接收数据data = client_socket.recv(1024)if not data:breakmessage = data.decode('utf-8')print(f"收到消息: {message}")# 回复客户端response = f"服务器收到: {message}"client_socket.send(response.encode('utf-8'))except Exception as e:print(f"处理客户端时出错: {e}")finally:client_socket.close()print(f"客户端 {address} 断开连接")def start_server():server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind(('localhost', 9999))server.listen(5)print("服务器启动,监听端口 9999...")try:while True:client_socket, address = server.accept()# 为每个客户端创建新线程client_thread = threading.Thread(target=handle_client,args=(client_socket, address))client_thread.start()except KeyboardInterrupt:print("服务器关闭")finally:server.close()
TCP客户端示例
def start_client():client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect(('localhost', 9999))try:while True:message = input("请输入消息 (输入'quit'退出): ")if message == 'quit':breakclient.send(message.encode('utf-8'))response = client.recv(1024)print(f"服务器回复: {response.decode('utf-8')}")finally:client.close()
HTTP服务器示例
HTTP服务器示例(使用内置模块)
from http.server import HTTPServer, BaseHTTPRequestHandler
import jsonclass MyHandler(BaseHTTPRequestHandler):def do_GET(self):if self.path == '/':self.send_response(200)self.send_header('Content-type', 'text/html')self.end_headers()self.wfile.write(b"<h1>Hello from Python HTTP Server!</h1>")elif self.path == '/api/data':self.send_response(200)self.send_header('Content-type', 'application/json')self.end_headers()data = {"message": "Hello API", "status": "success"}self.wfile.write(json.dumps(data).encode('utf-8'))else:self.send_response(404)self.end_headers()def do_POST(self):if self.path == '/api/echo':content_length = int(self.headers['Content-Length'])post_data = self.rfile.read(content_length)self.send_response(200)self.send_header('Content-type', 'application/json')self.end_headers()response = {"received": json.loads(post_data.decode('utf-8')),"message": "Data received successfully"}self.wfile.write(json.dumps(response).encode('utf-8'))def start_http_server():server = HTTPServer(('localhost', 8000), MyHandler)print("HTTP服务器启动,访问 http://localhost:8000")try:server.serve_forever()except KeyboardInterrupt:print("服务器关闭")server.server_close()# 注意:这些网络示例需要在不同终端中分别运行服务器和客户端
# 为了文档完整性,我们注释掉实际运行的代码
# 服务器: start_server() 或 start_http_server()
# 客户端: start_client()
5. 数据库操作
数据库操作示例(使用SQLite)
import sqlite3
from contextlib import contextmanager# 创建数据库连接的上下文管理器
@contextmanager
def get_db_connection(db_name="example.db"):conn = sqlite3.connect(db_name)try:yield connexcept Exception:conn.rollback()raiseelse:conn.commit()finally:conn.close()# 初始化数据库
def init_database():with get_db_connection() as conn:cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,email TEXT UNIQUE NOT NULL,age INTEGER)''')print("数据库初始化完成")# 插入用户
def add_user(name, email, age):with get_db_connection() as conn:cursor = conn.cursor()cursor.execute("INSERT INTO users (name, email, age) VALUES (?, ?, ?)",(name, email, age))print(f"用户 {name} 添加成功")# 查询所有用户
def get_all_users():with get_db_connection() as conn:cursor = conn.cursor()cursor.execute("SELECT * FROM users")return cursor.fetchall()# 根据ID查询用户
def get_user_by_id(user_id):with get_db_connection() as conn:cursor = conn.cursor()cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))return cursor.fetchone()# 更新用户信息
def update_user(user_id, name=None, email=None, age=None):with get_db_connection() as conn:cursor = conn.cursor()if name:cursor.execute("UPDATE users SET name = ? WHERE id = ?", (name, user_id))if email:cursor.execute("UPDATE users SET email = ? WHERE id = ?", (email, user_id))if age:cursor.execute("UPDATE users SET age = ? WHERE id = ?", (age, user_id))print(f"用户 {user_id} 信息更新完成")# 删除用户
def delete_user(user_id):with get_db_connection() as conn:cursor = conn.cursor()cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))print(f"用户 {user_id} 删除完成")# 使用示例
if __name__ == "__main__":init_database()# 添加用户add_user("大虫", "dachong@example.com", 25)add_user("小虫", "xiaochong@example.com", 30)# 查询所有用户users = get_all_users()print("所有用户:")for user in users:print(user)# 更新用户update_user(1, age=26)# 查询特定用户user = get_user_by_id(1)print(f"用户1的信息: {user}")# 删除用户delete_user(2)
使用SQLAlchemy ORM的示例
需要安装: pip install sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmakerBase = declarative_base()class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String)email = Column(String)age = Column(Integer)# 创建数据库引擎
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)# 创建会话
Session = sessionmaker(bind=engine)
session = Session()# 添加用户
new_user = User(name="小明", email="xiaoming@example.com", age=28)
session.add(new_user)
session.commit()# 查询用户
users = session.query(User).all()
for user in users:print(user.name, user.email, user.age)session.close()
6. Web开发基础(Flask/FastAPI)
Flask Web应用示例
需要安装: pip install flask
from flask import Flask, request, jsonify, render_template_stringapp = Flask(__name__)# 简单的HTML模板
HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head><title>Flask应用</title>
</head>
<body><h1>欢迎来到Flask应用</h1><p>当前时间: {{ time }}</p><form method="POST" action="/greet"><input type="text" name="name" placeholder="输入你的名字" required><button type="submit">问候</button></form>
</body>
</html>
'''@app.route('/')
def home():from datetime import datetimereturn render_template_string(HTML_TEMPLATE, time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))@app.route('/api/users', methods=['GET'])
def get_users():users = [{"id": 1, "name": "大虫", "age": 25},{"id": 2, "name": "小虫", "age": 30}]return jsonify(users)@app.route('/api/users', methods=['POST'])
def create_user():data = request.get_json()# 这里应该将数据保存到数据库return jsonify({"message": "用户创建成功", "user": data}), 201@app.route('/greet', methods=['POST'])
def greet():name = request.form.get('name')return f"<h1>你好, {name}!</h1><a href='/'>返回首页</a>"if __name__ == '__main__':app.run(debug=True)
FastAPI示例
需要安装: pip install fastapi uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Listapp = FastAPI()class User(BaseModel):id: intname: strage: int# 模拟数据库
users_db = [User(id=1, name="大虫", age=25),User(id=2, name="小虫", age=30)
]@app.get("/")
def read_root():return {"message": "欢迎来到FastAPI应用"}@app.get("/users", response_model=List[User])
def read_users():return users_db@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int):for user in users_db:if user.id == user_id:return userreturn {"error": "用户未找到"}@app.post("/users", response_model=User)
def create_user(user: User):users_db.append(user)return user# 运行: uvicorn main:app --reload
7. 数据分析基础(Pandas/Numpy)
数据分析示例(需要安装: pip install pandas numpy matplotlib)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt# 创建示例数据
data = {'name': ['大虫', '小虫', '小明', '小红', '小蓝'],'age': [25, 30, 35, 28, 32],'salary': [8000, 12000, 15000, 10000, 13000],'department': ['IT', 'HR', 'IT', 'Finance', 'IT']
}# 创建DataFrame
df = pd.DataFrame(data)
print("原始数据:")
print(df)# 基本统计信息
print("\n基本统计信息:")
print(df.describe())# 按部门分组统计
print("\n按部门统计平均薪资:")
print(df.groupby('department')['salary'].mean())# 数据筛选
it_employees = df[df['department'] == 'IT']
print("\nIT部门员工:")
print(it_employees)# 添加新列
df['bonus'] = df['salary'] * 0.1
print("\n添加奖金列后:")
print(df)# 数据排序
df_sorted = df.sort_values('salary', ascending=False)
print("\n按薪资排序:")
print(df_sorted)# 使用numpy进行数值计算
ages = np.array(df['age'])
salaries = np.array(df['salary'])print(f"\n年龄统计: 平均={np.mean(ages):.1f}, 标准差={np.std(ages):.1f}")
print(f"薪资统计: 平均={np.mean(salaries):.0f}, 最大值={np.max(salaries)}")# 简单数据可视化
plt.figure(figsize=(10, 4))plt.subplot(1, 2, 1)
plt.bar(df['name'], df['salary'])
plt.title('员工薪资')
plt.xticks(rotation=45)plt.subplot(1, 2, 2)
plt.hist(df['age'], bins=5)
plt.title('员工年龄分布')plt.tight_layout()
plt.show()
8. 性能优化
# 性能优化技巧import time
import cProfile
from functools import lru_cache# 1. 使用列表推导式而不是循环
def slow_way():result = []for i in range(100000):if i % 2 == 0:result.append(i * 2)return resultdef fast_way():return [i * 2 for i in range(100000) if i % 2 == 0]# 2. 使用生成器处理大数据
def generate_numbers(n):for i in range(n):yield i ** 2# 3. 缓存结果避免重复计算
@lru_cache(maxsize=128)
def fibonacci(n):if n < 2:return nreturn fibonacci(n-1) + fibonacci(n-2)# 4. 使用适当的内置函数
# 使用join而不是+连接字符串
def slow_string_concat(items):result = ""for item in items:result += itemreturn resultdef fast_string_concat(items):return "".join(items)# 5. 使用局部变量
def slow_loop():result = []for i in range(100000):result.append(i * i)return resultdef fast_loop():result = []append = result.append # 局部变量访问更快for i in range(100000):append(i * i)return result# 性能测试
def performance_test():# 测试列表推导式start = time.time()slow_way()slow_time = time.time() - startstart = time.time()fast_way()fast_time = time.time() - startprint(f"列表推导式性能对比 - 慢方法: {slow_time:.4f}s, 快方法: {fast_time:.4f}s")# 测试斐波那契数列start = time.time()fibonacci(35)fib_time = time.time() - startprint(f"缓存斐波那契计算时间: {fib_time:.4f}s")# 测试字符串连接items = ["item"] * 10000start = time.time()slow_string_concat(items)slow_str_time = time.time() - startstart = time.time()fast_string_concat(items)fast_str_time = time.time() - startprint(f"字符串连接性能对比 - 慢方法: {slow_str_time:.4f}s, 快方法: {fast_str_time:.4f}s")# 使用cProfile进行性能分析
def profile_example():def calculate():total = 0for i in range(1000000):total += i * ireturn total# 分析函数性能cProfile.run('calculate()')# 运行性能测试
# performance_test()
# profile_example()# 使用__slots__减少内存使用
class RegularClass:def __init__(self, x, y):self.x = xself.y = yclass SlottedClass:__slots__ = ['x', 'y']def __init__(self, x, y):self.x = xself.y = y# 比较内存使用
import sys
regular = RegularClass(1, 2)
slotted = SlottedClass(1, 2)print(f"普通类实例大小: {sys.getsizeof(regular)} bytes")
print(f"带slots的类实例大小: {sys.getsizeof(slotted)} bytes")
9. 项目部署
项目部署相关知识
- 创建requirements.txt
pip freeze > requirements.txt
- 使用Docker部署Python应用
Dockerfile示例:
FROM python:3.9-slimWORKDIR /appCOPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txtCOPY . .CMD ["python", "app.py"]
- 使用Gunicorn部署Flask应用(需要安装: pip install gunicorn)
gunicorn --workers 4 --bind 0.0.0.0:8000 app:app
- 使用Supervisor管理进程
supervisor配置文件示例:
[program:myapp]
command=/path/to/venv/bin/python /path/to/app.py
directory=/path/to/app
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/myapp.log
- 使用Nginx作为反向代理
nginx配置示例:
server {listen 80;server_name example.com;location / {proxy_pass http://127.0.0.1:8000;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}
}
- 使用环境变量管理配置
import os
from typing import Optionalclass Config:SECRET_KEY: Optional[str] = os.environ.get('SECRET_KEY') or 'dev-secret-key'DATABASE_URL: Optional[str] = os.environ.get('DATABASE_URL') or 'sqlite:///app.db'DEBUG: bool = os.environ.get('DEBUG', 'False').lower() == 'true'
- 日志配置
import logging
from logging.handlers import RotatingFileHandlerdef setup_logging():# 创建日志记录器logger = logging.getLogger('myapp')logger.setLevel(logging.INFO)# 创建文件处理器,文件大小超过1MB时轮转,保留5个备份file_handler = RotatingFileHandler('app.log', maxBytes=1024*1024, backupCount=5)file_handler.setLevel(logging.INFO)# 创建控制台处理器console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)# 创建格式化器formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)# 添加处理器到记录器logger.addHandler(file_handler)logger.addHandler(console_handler)return logger# logger = setup_logging()
# logger.info("应用启动")
10. 实战项目:简易Web应用
项目介绍
这个项目将综合运用Web开发、数据库操作、前后端交互等多个高级知识点,创建一个基于Flask的简易任务管理系统。
通过这个项目,你可以体验到Web应用开发的完整流程。虽然界面可能不够炫酷,但功能完整,足以让你体验到Web开发的魅力!
简易任务管理系统(使用Flask)
- 执行以下命令安装flask:
pip install flask
- 创建 app.py 文件,内容如下:
from flask import Flask, request, jsonify, render_template_string
from datetime import datetime
import json
import osapp = Flask(__name__)# 简单的HTML模板
HTML_TEMPLATE =`
<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"><title>任务管理系统</title><style>body { font-family: Arial, sans-serif; margin: 40px; }.task { border: 1px solid #ddd; padding: 10px; margin: 10px 0; }.completed { background-color: #e8f5e8; }form { margin: 20px 0; }input, button { padding: 8px; margin: 5px; }</style>
</head>
<body><h1>任务管理系统</h1><form id="taskForm"><input type="text" id="title" placeholder="任务标题" required><input type="text" id="description" placeholder="任务描述"><button type="submit">添加任务</button></form><div id="tasks">{% for task in tasks %}<div class="task {% if task.completed %}completed{% endif %}"><h3>{{ task.title }}</h3><p>{{ task.description }}</p><small>创建时间: {{ task.created_at }}</small><br><small>ID: {{ task.id }}</small><br><button onclick="toggleTask({{ task.id }})">{% if task.completed %}标记未完成{% else %}标记完成{% endif %}</button><button onclick="deleteTask({{ task.id }})">删除</button></div>{% endfor %}</div><script>// 添加任务document.getElementById('taskForm').addEventListener('submit', function(e) {e.preventDefault();const title = document.getElementById('title').value;const description = document.getElementById('description').value;fetch('/api/tasks', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({title: title, description: description})}).then(response => response.json()).then(data => {location.reload();});});// 切换任务状态function toggleTask(taskId) {fetch(`/api/tasks/${taskId}/toggle`, {method: 'PUT'}).then(response => response.json()).then(data => {location.reload();});}// 删除任务function deleteTask(taskId) {if (confirm('确定要删除这个任务吗?')) {fetch(`/api/tasks/${taskId}`, {method: 'DELETE'}).then(response => response.json()).then(data => {location.reload();});}}</script>
</body>
</html>
`# 数据存储
DATA_FILE = 'tasks.json'def load_tasks():if os.path.exists(DATA_FILE):with open(DATA_FILE, 'r', encoding='utf-8') as f:return json.load(f)return []def save_tasks(tasks):with open(DATA_FILE, 'w', encoding='utf-8') as f:json.dump(tasks, f, ensure_ascii=False, indent=2)# 任务模型
class Task:def __init__(self, title, description="", task_id=None):self.id = task_id or int(datetime.now().timestamp() * 1000)self.title = titleself.description = descriptionself.completed = Falseself.created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")def to_dict(self):return {'id': self.id,'title': self.title,'description': self.description,'completed': self.completed,'created_at': self.created_at}@classmethoddef from_dict(cls, data):task = cls(data['title'], data['description'], data['id'])task.completed = data['completed']task.created_at = data['created_at']return task# 路由定义
@app.route('/')
def index():tasks = load_tasks()task_objects = [Task.from_dict(task) for task in tasks]return render_template_string(HTML_TEMPLATE, tasks=task_objects)@app.route('/api/tasks', methods=['GET'])
def get_tasks():tasks = load_tasks()return jsonify(tasks)@app.route('/api/tasks', methods=['POST'])
def create_task():data = request.get_json()task = Task(data['title'], data.get('description', ''))tasks = load_tasks()tasks.append(task.to_dict())save_tasks(tasks)return jsonify(task.to_dict()), 201@app.route('/api/tasks/<int:task_id>/toggle', methods=['PUT'])
def toggle_task(task_id):tasks = load_tasks()for task in tasks:if task['id'] == task_id:task['completed'] = not task['completed']save_tasks(tasks)return jsonify(task)return jsonify({'error': 'Task not found'}), 404@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
def delete_task(task_id):tasks = load_tasks()tasks = [task for task in tasks if task['id'] != task_id]save_tasks(tasks)return jsonify({'message': 'Task deleted'})if __name__ == '__main__':app.run(debug=True)
- 运行这个Web应用,执行以下命令:
python app.py
- 在浏览器中访问 http://localhost:5000
恭喜你完成了高阶篇的学习!现在你已经掌握了Python的高级特性和实际应用,可以开始构建自己的项目了。
记住,编程之路没有终点,持续学习和实践才是王道。加油,未来的Python大牛!哈哈!