FastAPI与SQLAlchemy数据库集成与CRUD操作

avatar
cmdragon 大乘
image image

扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意

1. FastAPI 与 SQLAlchemy 同步数据库集成基础

1.1 环境准备与安装

首先创建虚拟环境并安装必要依赖:

1
2
3
4
python -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scripts\activate.bat # Windows
pip install fastapi uvicorn sqlalchemy pymysql

1.2 数据库连接配置

database.py中配置核心数据库连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@localhost/mydatabase"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=20,
max_overflow=0,
pool_pre_ping=True
)

SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False
)

1.3 模型定义与关系映射

models.py中定义数据模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sqlalchemy import Column, Integer, String
from database import Base


class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
age = Column(Integer, default=18)

def __repr__(self):
return f"<User(name='{self.name}', email='{self.email}')>"

2. CRUD 操作标准实现模式

2.1 数据访问层封装

创建repository.py实现CRUD操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from sqlalchemy.orm import Session
from models import User


class UserRepository:
@staticmethod
def create_user(db: Session, user_data: dict):
""" 创建用户 """
db_user = User(**user_data)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user

@staticmethod
def get_user(db: Session, user_id: int):
""" 根据ID获取用户 """
return db.query(User).filter(User.id == user_id).first()

@staticmethod
def update_user(db: Session, user_id: int, update_data: dict):
""" 更新用户信息 """
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
for key, value in update_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user

@staticmethod
def delete_user(db: Session, user_id: int):
""" 删除用户 """
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return True
return False

2.2 路由层实现

main.py中定义API端点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from models import Base
from database import engine, SessionLocal
from repository import UserRepository
from pydantic import BaseModel

Base.metadata.create_all(bind=engine)

app = FastAPI()


# 依赖注入获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()


class UserCreate(BaseModel):
name: str
email: str
age: int = 18


@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
return UserRepository.create_user(db, user.dict())


@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = UserRepository.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

3. Session 生命周期管理

3.1 Session 线程安全策略

通过依赖注入系统保证每个请求独立会话:

1
2
3
4
5
6
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

3.2 事务管理最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def transfer_funds(db: Session, from_id: int, to_id: int, amount: float):
try:
from_user = UserRepository.get_user(db, from_id)
to_user = UserRepository.get_user(db, to_id)

if from_user.balance < amount:
raise ValueError("Insufficient funds")

from_user.balance -= amount
to_user.balance += amount

db.commit()
except Exception as e:
db.rollback()
raise e

4. 常见错误处理

4.1 422 请求验证错误

示例场景

1
2
3
4
5
6
7
8
9
10
11
12
{
"detail": [
{
"loc": [
"body",
"age"
],
"msg": "value is not a valid integer",
"type": "type_error.integer"
}
]
}

解决方案

  1. 检查请求体是否匹配Pydantic模型定义
  2. 使用OpenAPI文档进行测试
  3. 添加中间件捕获验证错误:
1
2
3
4
5
6
7
8
9
from fastapi.exceptions import RequestValidationError


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
return JSONResponse(
status_code=400,
content={"detail": exc.errors(), "body": exc.body},
)

课后Quiz

问题1:以下哪种方式可以有效防止SQL注入攻击?
A) 使用字符串拼接SQL语句
B) 使用ORM的查询参数化功能
C) 在数据库连接字符串添加特殊参数
D) 禁用所有输入验证

答案:B) 正确。SQLAlchemy等ORM框架会自动进行参数化查询,将用户输入作为参数传递而不是直接拼接到SQL语句中。

问题2:为什么需要在finally块中关闭数据库会话?
A) 为了提升查询性能
B) 确保会话在任何情况下都会被正确关闭
C) 防止其他请求使用该会话
D) 满足数据库连接池的要求

答案:B) 正确。无论是否发生异常,finally块中的代码都会执行,保证会话资源的正确释放。

常见报错解决方案

**报错1
**:sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'localhost'")

原因分析

  • 数据库服务未启动
  • 连接参数(用户名/密码/端口)错误
  • 网络防火墙阻止连接

解决方案

  1. 检查MySQL服务状态
  2. 验证连接字符串参数
  3. 使用telnet测试端口连通性
  4. 添加连接超时参数:
1
create_engine(connect_args={"connect_timeout": 10})

**报错2
**:sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush.

原因分析

  • 数据库操作违反约束(如唯一性约束)
  • 事务未正确处理异常

解决方案

  1. 检查数据完整性约束
  2. 在事务代码块中添加try/except
  3. 执行显式回滚操作
  4. 使用session.expire_all()重置会话状态

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: