- 发布于
Python FastAPI现代Web API开发:构建高性能异步API服务
- 作者

- 姓名
- 全能波
- GitHub
- @weicracker
Python FastAPI现代Web API开发:构建高性能异步API服务
FastAPI是一个现代、快速的Python Web框架,专为构建API而设计。它基于标准Python类型提示,提供了自动API文档生成、数据验证和序列化等强大功能。
FastAPI核心特性
项目初始化和基础配置
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
sqlalchemy==2.0.23
alembic==1.13.0
python-multipart==0.0.6
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
aiofiles==23.2.1
redis==5.0.1
celery==5.3.4
pytest==7.4.3
httpx==0.25.2
# app/main.py - FastAPI应用主入口
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from contextlib import asynccontextmanager
import uvicorn
import logging
from typing import AsyncGenerator
from app.core.config import settings
from app.core.database import engine, Base
from app.api.v1.router import api_router
from app.core.redis import redis_client
from app.core.logging import setup_logging
# 设置日志
setup_logging()
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
"""应用生命周期管理"""
# 启动时执行
logger.info("Starting FastAPI application...")
# 创建数据库表
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 初始化Redis连接
await redis_client.ping()
logger.info("Redis connection established")
yield
# 关闭时执行
logger.info("Shutting down FastAPI application...")
await redis_client.close()
# 创建FastAPI应用实例
app = FastAPI(
title=settings.PROJECT_NAME,
description="现代化FastAPI应用示例",
version="1.0.0",
openapi_url=f"{settings.API_V1_STR}/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# 添加中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_HOSTS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=settings.ALLOWED_HOSTS
)
# 包含API路由
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/")
async def root():
"""根路径健康检查"""
return {
"message": "FastAPI应用运行正常",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""健康检查端点"""
try:
# 检查数据库连接
async with engine.begin() as conn:
await conn.execute("SELECT 1")
# 检查Redis连接
await redis_client.ping()
return {
"status": "healthy",
"database": "connected",
"redis": "connected"
}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Service unhealthy"
)
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
配置管理
# app/core/config.py - 应用配置
from pydantic_settings import BaseSettings
from typing import List, Optional
import secrets
class Settings(BaseSettings):
# 基础配置
PROJECT_NAME: str = "FastAPI Modern App"
VERSION: str = "1.0.0"
API_V1_STR: str = "/api/v1"
# 安全配置
SECRET_KEY: str = secrets.token_urlsafe(32)
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7 # 7天
ALGORITHM: str = "HS256"
# 数据库配置
DATABASE_URL: str = "postgresql+asyncpg://user:password@localhost/fastapi_db"
# Redis配置
REDIS_URL: str = "redis://localhost:6379/0"
# CORS配置
ALLOWED_HOSTS: List[str] = ["*"]
# 邮件配置
SMTP_TLS: bool = True
SMTP_PORT: Optional[int] = None
SMTP_HOST: Optional[str] = None
SMTP_USER: Optional[str] = None
SMTP_PASSWORD: Optional[str] = None
EMAILS_FROM_EMAIL: Optional[str] = None
# 文件上传配置
MAX_FILE_SIZE: int = 10 * 1024 * 1024 # 10MB
UPLOAD_DIR: str = "uploads"
# 分页配置
DEFAULT_PAGE_SIZE: int = 20
MAX_PAGE_SIZE: int = 100
# 缓存配置
CACHE_TTL: int = 300 # 5分钟
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
数据库模型和连接
# app/core/database.py - 数据库配置
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
from typing import AsyncGenerator
from app.core.config import settings
# 创建异步数据库引擎
engine = create_async_engine(
settings.DATABASE_URL,
echo=True,
future=True,
pool_pre_ping=True,
pool_recycle=300,
)
# 创建会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# 创建基础模型类
Base = declarative_base()
# 数据库会话依赖
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
# 用户模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
posts = relationship("Post", back_populates="author")
# 文章模型
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
content = Column(Text, nullable=False)
summary = Column(String)
slug = Column(String, unique=True, index=True, nullable=False)
is_published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 外键
author_id = Column(Integer, ForeignKey("users.id"))
# 关系
author = relationship("User", back_populates="posts")
Pydantic模型和数据验证
# app/schemas/user.py - 用户相关的Pydantic模型
from pydantic import BaseModel, EmailStr, validator, Field
from typing import Optional, List
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
full_name: Optional[str] = Field(None, max_length=100)
class UserCreate(UserBase):
password: str = Field(..., min_length=8, max_length=100)
@validator('password')
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含至少一个大写字母')
if not any(c.islower() for c in v):
raise ValueError('密码必须包含至少一个小写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含至少一个数字')
return v
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = Field(None, min_length=3, max_length=50)
full_name: Optional[str] = Field(None, max_length=100)
is_active: Optional[bool] = None
class UserInDB(UserBase):
id: int
is_active: bool
is_superuser: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class User(UserInDB):
pass
class UserWithPosts(User):
posts: List['PostInDB'] = []
# 文章相关模型
class PostBase(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
summary: Optional[str] = Field(None, max_length=500)
slug: str = Field(..., min_length=1, max_length=100)
class PostCreate(PostBase):
pass
class PostUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=1, max_length=200)
content: Optional[str] = Field(None, min_length=1)
summary: Optional[str] = Field(None, max_length=500)
slug: Optional[str] = Field(None, min_length=1, max_length=100)
is_published: Optional[bool] = None
class PostInDB(PostBase):
id: int
is_published: bool
created_at: datetime
updated_at: datetime
author_id: int
class Config:
from_attributes = True
class Post(PostInDB):
author: User
# 认证相关模型
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
username: Optional[str] = None
class LoginRequest(BaseModel):
username: str
password: str
# 通用响应模型
class ResponseModel(BaseModel):
success: bool = True
message: str = "操作成功"
data: Optional[dict] = None
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
page: int
size: int
pages: int
认证和授权系统
# app/core/security.py - 安全相关功能
from datetime import datetime, timedelta
from typing import Optional, Union
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.models.user import User
from app.crud.user import user_crud
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT Bearer认证
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""创建刷新令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(token: str, token_type: str = "access") -> Optional[str]:
"""验证令牌"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
token_type_in_token: str = payload.get("type")
if username is None or token_type_in_token != token_type:
return None
return username
except JWTError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""获取当前用户"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
username = verify_token(credentials.credentials)
if username is None:
raise credentials_exception
user = await user_crud.get_by_username(db, username=username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""获取当前活跃用户"""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="用户账户已被禁用")
return current_user
async def get_current_superuser(current_user: User = Depends(get_current_user)) -> User:
"""获取当前超级用户"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return current_user
CRUD操作和服务层
# app/crud/base.py - 基础CRUD操作
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete, func
from sqlalchemy.orm import selectinload
from app.core.database import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
self.model = model
async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]:
"""根据ID获取单个对象"""
result = await db.execute(select(self.model).where(self.model.id == id))
return result.scalar_one_or_none()
async def get_multi(
self,
db: AsyncSession,
*,
skip: int = 0,
limit: int = 100,
**filters
) -> List[ModelType]:
"""获取多个对象"""
query = select(self.model)
# 应用过滤条件
for key, value in filters.items():
if hasattr(self.model, key) and value is not None:
query = query.where(getattr(self.model, key) == value)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
return result.scalars().all()
async def count(self, db: AsyncSession, **filters) -> int:
"""计算对象数量"""
query = select(func.count(self.model.id))
# 应用过滤条件
for key, value in filters.items():
if hasattr(self.model, key) and value is not None:
query = query.where(getattr(self.model, key) == value)
result = await db.execute(query)
return result.scalar()
async def create(self, db: AsyncSession, *, obj_in: CreateSchemaType) -> ModelType:
"""创建新对象"""
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def update(
self,
db: AsyncSession,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
"""更新对象"""
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def remove(self, db: AsyncSession, *, id: int) -> ModelType:
"""删除对象"""
obj = await self.get(db, id=id)
await db.delete(obj)
await db.commit()
return obj
API路由和端点
# app/api/v1/endpoints/users.py - 用户相关API端点
from typing import List, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import get_current_active_user, get_current_superuser
from app.crud.user import user_crud
from app.schemas.user import User, UserCreate, UserUpdate, UserWithPosts
from app.schemas.common import PaginatedResponse
from app.models.user import User as UserModel
router = APIRouter()
@router.get("/", response_model=PaginatedResponse)
async def read_users(
db: AsyncSession = Depends(get_db),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
current_user: UserModel = Depends(get_current_superuser),
) -> Any:
"""获取用户列表(仅超级用户)"""
users = await user_crud.get_multi(db, skip=skip, limit=limit)
total = await user_crud.count(db)
return PaginatedResponse(
items=[User.from_orm(user) for user in users],
total=total,
page=skip // limit + 1,
size=limit,
pages=(total + limit - 1) // limit
)
@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(
*,
db: AsyncSession = Depends(get_db),
user_in: UserCreate,
current_user: UserModel = Depends(get_current_superuser),
) -> Any:
"""创建新用户(仅超级用户)"""
user = await user_crud.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="该邮箱已被注册",
)
user = await user_crud.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=400,
detail="该用户名已被使用",
)
user = await user_crud.create(db, obj_in=user_in)
return user
@router.get("/me", response_model=UserWithPosts)
async def read_user_me(
db: AsyncSession = Depends(get_db),
current_user: UserModel = Depends(get_current_active_user),
) -> Any:
"""获取当前用户信息"""
user_with_posts = await user_crud.get_with_posts(db, user_id=current_user.id)
return user_with_posts
@router.put("/me", response_model=User)
async def update_user_me(
*,
db: AsyncSession = Depends(get_db),
user_in: UserUpdate,
current_user: UserModel = Depends(get_current_active_user),
) -> Any:
"""更新当前用户信息"""
# 检查邮箱和用户名是否已被其他用户使用
if user_in.email:
existing_user = await user_crud.get_by_email(db, email=user_in.email)
if existing_user and existing_user.id != current_user.id:
raise HTTPException(
status_code=400,
detail="该邮箱已被其他用户使用",
)
if user_in.username:
existing_user = await user_crud.get_by_username(db, username=user_in.username)
if existing_user and existing_user.id != current_user.id:
raise HTTPException(
status_code=400,
detail="该用户名已被其他用户使用",
)
user = await user_crud.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=User)
async def read_user(
user_id: int,
current_user: UserModel = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
) -> Any:
"""根据ID获取用户信息"""
user = await user_crud.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="用户不存在",
)
# 非超级用户只能查看自己的信息
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=403,
detail="权限不足",
)
return user
@router.put("/{user_id}", response_model=User)
async def update_user(
*,
db: AsyncSession = Depends(get_db),
user_id: int,
user_in: UserUpdate,
current_user: UserModel = Depends(get_current_superuser),
) -> Any:
"""更新用户信息(仅超级用户)"""
user = await user_crud.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="用户不存在",
)
user = await user_crud.update(db, db_obj=user, obj_in=user_in)
return user
@router.delete("/{user_id}")
async def delete_user(
*,
db: AsyncSession = Depends(get_db),
user_id: int,
current_user: UserModel = Depends(get_current_superuser),
) -> Any:
"""删除用户(仅超级用户)"""
user = await user_crud.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="用户不存在",
)
if user.id == current_user.id:
raise HTTPException(
status_code=400,
detail="不能删除自己的账户",
)
user = await user_crud.remove(db, id=user_id)
return {"message": "用户删除成功"}
总结
FastAPI现代Web API开发的核心要点:
🎯 核心特性
- 类型安全:基于Python类型提示的自动验证
- 异步支持:原生异步编程支持
- 自动文档:OpenAPI和JSON Schema自动生成
- 高性能:基于Starlette和Pydantic的高性能框架
✅ 最佳实践
- 项目结构化组织
- 配置管理和环境变量
- 数据库异步操作
- 认证和授权系统
- CRUD操作封装
🚀 高级功能
- 中间件和生命周期管理
- 缓存和性能优化
- 错误处理和日志记录
- 测试和部署策略
💡 开发建议
- 使用Pydantic进行数据验证
- 实现完整的认证授权
- 合理的错误处理机制
- 完善的API文档
掌握FastAPI,构建现代化的高性能Web API服务!
FastAPI代表了Python Web开发的未来方向,其现代化的设计理念和强大的功能使其成为构建API服务的首选框架。