使用FastAPI和React以及MongoDB构建全栈Web应用05 FastAPI快速入门
一、FastAPI概述
1.1 什么是FastAPI
FastAPI is a modern, high-performance Python web framework designed for building APIs. It’s rapidly gaining popularity due to its ease of use, speed, and powerful features. Built on top of Starlette, FastAPI leverages asynchronous programming and type hints to deliver exceptional performance while maintaining code readability.
FastAPI 是一个现代的、高性能的 Python 网络框架,专为构建 API 而设计。由于其易用性、速度和强大的功能,它正迅速受到欢迎。FastAPI 构建于 Starlette 之上,利用异步编程和类型提示来实现卓越的性能,同时保持代码的可读性。
Key Features of FastAPI
- High Performance: FastAPI is incredibly fast, often matching the performance of Node.js and Go frameworks.
- Fast to Code: Type hints and automatic data validation significantly reduce development time.
- Easy to Use: The intuitive syntax and clear documentation make it accessible to developers of all levels.
- Rich Feature Set: Includes data validation, automatic documentation, dependency injection, and more.
- Data Validation: Ensures data integrity by automatically validating incoming data based on Python type hints.
- Automatic Documentation: Generates interactive API documentation using Swagger UI, making it easy to understand and test APIs.
- Dependency Injection: Manages dependencies efficiently, improving code organisation and testability.
- Asynchronous Support: Handles multiple requests concurrently, enhancing performance and scalability.
FastAPI 的主要特性
- 高性能: FastAPI 极其快速,通常能与 Node.js 和 Go 框架的性能相媲美。
- 快速编码: 类型提示和自动数据验证显著减少了开发时间。
- 易于使用: 直观的语法和清晰的文档使其对各级开发者都易于上手。
- 丰富的功能集: 包括数据验证、自动文档生成、依赖注入等。
- 数据验证: 通过基于 Python 类型提示自动验证传入数据来确保数据完整性。
- 自动文档生成: 使用 Swagger UI 生成交互式 API 文档,便于理解和测试 API。
- 依赖注入: 高效管理依赖项,改善代码组织和可测试性。
- 异步支持: 并发处理多个请求,提高性能和可扩展性。
1.2 创建基本的FastAPI应用
from fastapi import FastAPIapp = FastAPI()@app.get("/")
async def root():return {"message": "Hello, World!"}
This code defines a basic FastAPI application with a single endpoint, /. When a GET request is made to this endpoint, the function root is called, and it returns a JSON response containing the message “Hello, World!”.
这段代码定义了一个基本的 FastAPI 应用程序,其中包含一个单一的端点 / 。当向此端点发出 GET 请求时,会调用 root 函数,该函数会返回一个包含消息“Hello, World!”的 JSON 响应。
1.3 使用pydantic验证数据
FastAPI seamlessly integrates with Pydantic for robust data validation. Let’s create a model to validate incoming data:
FastAPI 与 Pydantic 兼容性极强,能够实现强大的数据验证功能。接下来,我们来创建一个模型以验证传入的数据:
from fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI()class Item(BaseModel):name: strprice: floatis_available: bool@app.post("/items/")
async def create_item(item: Item):return item
In this example, the Item model defines the expected data structure for incoming requests. The create_item function automatically validates the incoming data against the Item model. If the data is invalid, FastAPI will return an appropriate error response.
在这个示例中,Item 模型定义了接收请求时所期望的数据结构。create_item 函数会自动根据 Item 模型对传入的数据进行验证。如果数据无效,FastAPI 将返回相应的错误响应。
FastAPI is a powerful and efficient framework for building APIs with Python. Its focus on performance, ease of use, and data validation makes it an excellent choice for a wide range of projects. In the following sections, we will explore advanced topics like database integration, authentication, and deployment to build a complete web application using FastAPI, React, and MongoDB.
FastAPI 是一个基于 Python 语言的强大且高效的构建 API 的框架。它在性能、易用性和数据验证方面的出色表现使其成为众多项目的理想选择。在接下来的章节中,我们将探讨一些高级主题,如数据库集成、认证以及部署,以使用 FastAPI、React 和 MongoDB 构建一个完整的网络应用程序。
二、创建后端项目
2.1 目录结构
fastapi_project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ └── routes.py
├── requirements.txt
2.2 模型
app/models.py
from pydantic import BaseModelclass Item(BaseModel):id: strname: strdescription: str | None = Noneprice: floattax: float = 10.5tags: list[str] = []
2.3 数据库
app/database.py
import motor.motor_asyncioclient = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")async def get_database():db = client["fastapi_db"]return db
2.4 路由
app/routes.py
from fastapi import FastAPI
from app import routesapp = FastAPI()app.include_router(routes.router)if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
三、请求处理
3.1 路径参数
服务端:
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 连接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允许所有来源访问
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定义博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 创建一个博客
@app.post("/blogs/")
async def create_blog(blog: Blog):result = await collection.insert_one(blog.dict())return {"_id": str(result.inserted_id)}# 获取所有博客
@app.get("/blogs/")
async def get_blogs():blogs = await collection.find().to_list(length=100)return [{"_id": str(blog["_id"]), **blog} for blog in blogs]# 通过路径参数获取单个博客
@app.get("/blogs/{blog_id}")
async def get_blog(blog_id: str):blog = await collection.find_one({"_id": ObjectId(blog_id)})if not blog:raise HTTPException(status_code=404, detail="Blog not found")return {"_id": str(blog["_id"]), **blog}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
React渲染:
import { useState, useEffect } from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [selectedBlog, setSelectedBlog] = useState(null);const [newBlog, setNewBlog] = useState({ title: '', content: '', author: '', created_at: '' });useEffect(() => {// 请求所有博客fetch('http://127.0.0.1:8080/blogs/').then(response => response.json()).then(data => setBlogs(data)).catch(error => console.error('Error fetching blogs:', error));}, []);const handleBlogClick = (blogId) => {// 请求单个博客fetch(`http://127.0.0.1:8080/blogs/${blogId}`).then(response => response.json()).then(data => setSelectedBlog(data)).catch(error => console.error('Error fetching blog:', error));};const handleInputChange = (e) => {const { name, value } = e.target;setNewBlog(prev => ({ ...prev, [name]: value }));};const handleSubmit = (e) => {e.preventDefault();fetch('http://127.0.0.1:8080/blogs/', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify(newBlog)}).then(response => response.json()).then(data => {setBlogs(prev => [...prev, { ...newBlog, _id: data._id }]);setNewBlog({ title: '', content: '', author: '', created_at: '' });}).catch(error => console.error('Error submitting blog:', error));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card" onClick={() => handleBlogClick(blog._id)}><h2>{blog.title}</h2></div>))}</div>{selectedBlog && (<div className="blog-detail"><h2>{selectedBlog.title}</h2><p>{selectedBlog.content}</p><p>作者: {selectedBlog.author}</p><p>创建时间: {selectedBlog.created_at}</p></div>)}<h2>新增博客</h2><form onSubmit={handleSubmit}><inputtype="text"name="title"value={newBlog.title}onChange={handleInputChange}placeholder="标题"required/><textareaname="content"value={newBlog.content}onChange={handleInputChange}placeholder="内容"required/><inputtype="text"name="author"value={newBlog.author}onChange={handleInputChange}placeholder="作者"required/><inputtype="text"name="created_at"value={newBlog.created_at}onChange={handleInputChange}placeholder="创建时间"required/><button type="submit">提交</button></form></header></div>);
}export default App;
样式App.css
* {margin: 0;padding: 0;box-sizing: border-box;
}.App {text-align: center;
}.App-header {background-color: #282c34;min-height: 100vh;display: flex;flex-direction: column;align-items: center;justify-content: center;font-size: calc(10px + 2vmin);color: white;
}.blog-card {background-color: #333;padding: 20px;margin: 10px 0;border-radius: 8px;box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);cursor: pointer;
}.blog-card h2 {margin-top: 0;
}.blog-card p {margin: 10px 0;
}.blog-detail {background-color: #444;padding: 20px;margin: 10px 0;border-radius: 8px;box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);
}.blog-detail h2 {margin-top: 0;
}.blog-detail p {margin: 10px 0;
}form {display: flex;flex-direction: column;align-items: center;
}input, textarea {margin: 10px 0;padding: 10px;border: 1px solid #ccc;border-radius: 4px;width: 80%;
}button {padding: 10px 20px;background-color: #61dafb;border: none;border-radius: 4px;cursor: pointer;
}button:hover {background-color: #007bff;
}
3.2 查询参数
后端代码:
from fastapi import FastAPI, HTTPException, Query
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 连接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允许所有来源访问
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定义博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 写入100条测试数据
async def create_test_data():for i in range(100):blog = Blog(title=f"测试博客 {i + 1}",content=f"这是第 {i + 1} 篇博客的内容",author=f"作者 {i + 1}",created_at="2025-05-10 12:33:33")await collection.insert_one(blog.dict())# 初始化时创建测试数据
@app.on_event("startup")
async def startup_event():await create_test_data()# 分页查询博客
@app.get("/blogs/")
async def get_blogs(page: int = Query(1, ge=1),page_size: int = Query(10, ge=1, le=100)
):skip = (page - 1) * page_sizeblogs = await collection.find().skip(skip).limit(page_size).to_list(length=page_size)total = await collection.count_documents({})total_pages = (total + page_size - 1) // page_sizedata = []for blog in blogs:blog["_id"] = str(blog["_id"])data.append(blog)return {"blogs": data,"total": total,"page": page,"page_size": page_size,"total_pages": total_pages}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
前端代码:
import { useState, useEffect } from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [pagination, setPagination] = useState({page: 1,pageSize: 10,totalPages: 1,total: 0});useEffect(() => {fetchBlogs();}, [pagination.page, pagination.pageSize]);const fetchBlogs = () => {fetch(`http://127.0.0.1:8080/blogs/?page=${pagination.page}&page_size=${pagination.pageSize}`).then(response => response.json()).then(data => {setBlogs(data.blogs);setPagination(prev => ({...prev,totalPages: data.total_pages,total: data.total}));}).catch(error => console.error('Error fetching blogs:', error));};const handlePageChange = (newPage) => {if (newPage >= 1 && newPage <= pagination.totalPages) {setPagination(prev => ({ ...prev, page: newPage }));}};const handlePageSizeChange = (e) => {const newPageSize = parseInt(e.target.value);setPagination(prev => ({...prev,page: 1,pageSize: newPageSize}));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card"><h2>{blog.title}</h2><p>{blog.content}</p><p>作者: {blog.author}</p><p>创建时间: {blog.created_at}</p></div>))}</div><div className="pagination"><button onClick={() => handlePageChange(pagination.page - 1)} disabled={pagination.page === 1}>上一页</button><span>{pagination.page}</span><button onClick={() => handlePageChange(pagination.page + 1)} disabled={pagination.page === pagination.totalPages}>下一页</button><select value={pagination.pageSize} onChange={handlePageSizeChange}><option value="10">10条/页</option><option value="20">20条/页</option><option value="50">50条/页</option></select><p>总共 {pagination.total} 条,共 {pagination.totalPages} 页</p></div></header></div>);
}export default App;
3.3 JSON参数
后端代码:
from fastapi import FastAPI, Request, Query, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 连接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允许所有来源访问
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定义博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 写入100条测试数据
async def create_test_data():await collection.delete_many({})for i in range(3):blog = Blog(title=f"测试博客 {i + 1}",content=f"这是第 {i + 1} 篇博客的内容",author=f"作者 {i + 1}",created_at="2025-05-10 12:33:33")await collection.insert_one(blog.dict())# 初始化时创建测试数据
@app.on_event("startup")
async def startup_event():await create_test_data()# 创建一个博客
@app.post("/blogs/")
async def create_blog(blog: Blog):result = await collection.insert_one(blog.dict())return {"_id": str(result.inserted_id), **blog.dict()}# 获取所有博客
@app.get("/blogs/")
async def get_blogs():blogs = await collection.find().to_list(length=100)data = []for blog in blogs:blog["_id"] = str(blog["_id"])data.append(blog)return dataif __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
前端代码:
import { useState, useEffect } from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [newBlog, setNewBlog] = useState({ title: '', content: '', author: '', created_at: '' });useEffect(() => {fetchBlogs();}, []);const fetchBlogs = () => {fetch('http://127.0.0.1:8080/blogs/').then(response => response.json()).then(data => setBlogs(data)).catch(error => console.error('Error fetching blogs:', error));};const handleInputChange = (e) => {const { name, value } = e.target;setNewBlog(prev => ({ ...prev, [name]: value }));};const handleSubmit = (e) => {e.preventDefault();fetch('http://127.0.0.1:8080/blogs/', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify(newBlog)}).then(response => response.json()).then(data => {setBlogs(prev => [...prev, data]);setNewBlog({ title: '', content: '', author: '', created_at: '' });fetchBlogs();}).catch(error => console.error('Error submitting blog:', error));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card"><h2>{blog.title}</h2><p>{blog.content}</p><p>作者: {blog.author}</p><p>创建时间: {blog.created_at}</p></div>))}</div><h2>新增博客</h2><form onSubmit={handleSubmit}><inputtype="text"name="title"value={newBlog.title}onChange={handleInputChange}placeholder="标题"required/><textareaname="content"value={newBlog.content}onChange={handleInputChange}placeholder="内容"required/><inputtype="text"name="author"value={newBlog.author}onChange={handleInputChange}placeholder="作者"required/><inputtype="text"name="created_at"value={newBlog.created_at}onChange={handleInputChange}placeholder="创建时间"required/><button type="submit">提交</button></form></header></div>);
}export default App;
3.4 请求头参数
后端代码:
from fastapi import FastAPI, Request, Query, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 连接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允许所有来源访问
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定义博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 写入100条测试数据
async def create_test_data():await collection.delete_many({})for i in range(100):blog = Blog(title=f"测试博客 {i + 1}",content=f"这是第 {i + 1} 篇博客的内容",author=f"作者 {i + 1}",created_at="2025-05-10 12:33:33")await collection.insert_one(blog.dict())# 初始化时创建测试数据
@app.on_event("startup")
async def startup_event():await create_test_data()# 分页查询博客
@app.get("/blogs/")
async def get_blogs(request: Request,page: int = Query(1, ge=1),page_size: int = Query(10, ge=1, le=100)
):# 从请求头中获取参数api_key = request.headers.get("X-API-Key")if not api_key or api_key != "your_api_key":raise HTTPException(status_code=401, detail="Invalid API Key")# 分页查询skip = (page - 1) * page_sizeblogs = await collection.find().skip(skip).limit(page_size).to_list(length=page_size)total = await collection.count_documents({})total_pages = (total + page_size - 1) // page_sizedata = []for blog in blogs:blog["_id"] = str(blog["_id"])data.append(blog)return {"blogs": data,"total": total,"page": page,"page_size": page_size,"total_pages": total_pages}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
前端代码:
import {useState, useEffect} from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [apiKey, setApiKey] = useState('your_api_key');const [pagination, setPagination] = useState({page: 1,pageSize: 10,totalPages: 1,total: 0});useEffect(() => {fetchBlogs();}, [pagination.page, pagination.pageSize]);const fetchBlogs = () => {fetch(`http://127.0.0.1:8080/blogs/?page=${pagination.page}&page_size=${pagination.pageSize}`, {headers: {'X-API-Key': apiKey}}).then(response => response.json()).then(data => {setBlogs(data.blogs);setPagination(prev => ({...prev,totalPages: data.total_pages,total: data.total}));}).catch(error => console.error('Error fetching blogs:', error));};const handlePageChange = (newPage) => {if (newPage >= 1 && newPage <= pagination.totalPages) {setPagination(prev => ({...prev, page: newPage}));}};const handlePageSizeChange = (e) => {const newPageSize = parseInt(e.target.value);setPagination(prev => ({...prev,page: 1,pageSize: newPageSize}));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card"><h2>{blog.title}</h2><p>{blog.content}</p><p>作者: {blog.author}</p><p>创建时间: {blog.created_at}</p></div>))}</div><div className="pagination"><button onClick={() => handlePageChange(pagination.page - 1)} disabled={pagination.page === 1}>上一页</button><span>{pagination.page}</span><button onClick={() => handlePageChange(pagination.page + 1)}disabled={pagination.page === pagination.totalPages}>下一页</button><select value={pagination.pageSize} onChange={handlePageSizeChange}><option value="10">10条/页</option><option value="20">20条/页</option><option value="50">50条/页</option></select><p>总共 {pagination.total} 条,共 {pagination.totalPages} 页</p></div></header></div>);
}export default App;
总结
源滚滚编程提供全套的PDF文档,配套源代码,录播课,私教课和直播课,关注并私信我咨询获取。