@validator("data_stream") asyncdefprocess_stream(cls, v): asyncwith aiostream.stream.iterate(v) as stream: returnawait ( stream .map(lambda x: x * 2) .filter(lambda x: x < 100) .throttle(10) # 限流10条/秒 .list() )
4.2 异步动态依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14
classPaymentValidator(BaseModel): user_id: int balance: float = None
@validator("user_id") asyncdeffetch_balance(cls, v): asyncwith aiohttp.ClientSession() as session: asyncwith session.get(f"/users/{v}/balance") as resp: returnawait resp.json()
@validator("balance", always=True) asyncdefcheck_sufficient(cls, v): if v < 100: raise ValueError("余额不足最低限额")
第五章:错误处理与优化
5.1 异步超时控制
1 2 3 4 5 6 7 8 9 10 11 12
classTimeoutValidator(BaseModel): api_url: str
@validator("api_url") asyncdefvalidate_with_timeout(cls, v): try: asyncwith asyncio.timeout(5): asyncwith aiohttp.ClientSession() as session: asyncwith session.get(v) as resp: return v if resp.status == 200else"invalid" except TimeoutError: raise ValueError("API响应超时")
5.2 异步错误聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from pydantic import ValidationError
classBulkValidator(BaseModel): items: list[str]
@validator("items") asyncdefbulk_check(cls, v): errors = [] for item in v: try: await external_api.check(item) except Exception as e: errors.append(f"{item}: {str(e)}") if errors: raise ValueError("\n".join(errors)) return v
课后Quiz
Q1:异步校验器的核心关键字是? A) async/await B) thread C) multiprocessing