Это пятая, заключительная статья серии о Pydantic v2. Часть 1 — миграция, часть 2 — patterns, часть 3 — производительность, часть 4 — микросервисы. Здесь — продвинутые темы и интеграции, которые выходят за рамки типичного REST API.
📚 Серия статей: Pydantic v2 в Production
- Часть 1: Миграция v1→v2 — процесс, инциденты, метрики
- Часть 2: Production Patterns — ConfigDict, валидация, сериализация
- Часть 3: Производительность — профилирование, оптимизации, бенчмарки
- Часть 4: Микросервисы — schema registry, версионирование, FastAPI
- Часть 5: Advanced Topics ← вы здесь
⚡ TL;DR (для торопящихся)
- Async validation: Dependency injection (recommended) vs async wrapper vs batch validation с semaphore
- GraphQL + Strawberry: Pydantic как единый источник истины для схем, автоматическая конвертация через
from_pydantic/to_pydantic - CLI с Typer: Валидация аргументов, YAML/JSON конфигов, generate-config команды
- Message brokers: Type-safe Kafka events, RabbitMQ с Pydantic, DLQ для невалидных сообщений
- Custom plugins: Кастомные типы через
__get_pydantic_core_schema__, расширение валидации и сериализации - Главный урок: Pydantic универсален — от REST API до GraphQL, от CLI до event-driven архитектуры
Для кого: Full-stack разработчиков, тех кто использует GraphQL/gRPC/message brokers, команд с нестандартными требованиями.
Async Validation: I/O операции без блокировки
Проблема: синхронные валидаторы блокируют event loop
from pydantic import BaseModel, field_validator
import httpx
# ❌ Плохо: синхронный HTTP запрос в валидаторе
class PromoCode(BaseModel):
code: str
@field_validator('code')
@classmethod
def validate_code(cls, v: str) -> str:
# Блокирует event loop на время HTTP запроса!
response = httpx.get(f"http://promo-service/validate/{v}")
if response.status_code != 200:
raise ValueError(f'Invalid promo code: {v}')
return v
# В async endpoint это катастрофа:
@app.post("/orders")
async def create_order(order_data: dict):
# Валидация заблокирует весь event loop
order = PromoCode(**order_data) # ← блокировка!
return orderРешение 1: Dependency Injection (рекомендуется)
from fastapi import Depends, HTTPException
import httpx
class PromoCode(BaseModel):
"""Модель без I/O в валидаторе."""
code: str
async def validate_promo_code_async(
order: PromoCode,
http_client: httpx.AsyncClient = Depends(get_http_client)
) -> PromoCode:
"""Async dependency для валидации промокода."""
try:
response = await http_client.get(
f"http://promo-service/validate/{order.code}",
timeout=2.0
)
if response.status_code != 200:
raise HTTPException(
status_code=400,
detail=f"Invalid promo code: {order.code}"
)
# Можем добавить данные из response в order
promo_data = response.json()
order.discount_percent = promo_data.get('discount', 0)
return order
except httpx.TimeoutException:
raise HTTPException(
status_code=504,
detail="Promo service timeout"
)
@app.post("/orders")
async def create_order(
order: PromoCode = Depends(validate_promo_code_async)
):
# order уже провалидирован асинхронно
return orderРешение 2: Custom validator с async wrapper
from pydantic import BaseModel, model_validator
from typing import Any
import asyncio
class OrderWithAsyncValidation(BaseModel):
"""Модель с async валидацией через wrapper."""
product_id: str
quantity: int
promo_code: str | None = None
_validated_async: bool = False
@model_validator(mode='after')
def ensure_async_validated(self):
"""Проверяем, что async валидация прошла."""
if not self._validated_async:
raise ValueError(
'Model must be validated with validate_async() method'
)
return self
@classmethod
async def validate_async(cls, data: dict) -> "OrderWithAsyncValidation":
"""Async конструктор с валидацией."""
# Сначала создаём без async валидации
obj = cls.model_construct(**data)
# Async валидация
if obj.promo_code:
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://promo-service/validate/{obj.promo_code}"
)
if response.status_code != 200:
raise ValueError(f'Invalid promo code: {obj.promo_code}')
# Проверяем наличие продукта
if obj.product_id:
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://catalog-service/products/{obj.product_id}"
)
if response.status_code != 200:
raise ValueError(f'Product not found: {obj.product_id}')
# Помечаем как провалидированный
obj._validated_async = True
return obj
# Использование
@app.post("/orders")
async def create_order(order_data: dict):
# Async валидация
order = await OrderWithAsyncValidation.validate_async(order_data)
return orderРешение 3: Batch async validation
from pydantic import BaseModel
from typing import TypeVar, Generic
import asyncio
T = TypeVar('T', bound=BaseModel)
class AsyncValidator(Generic[T]):
"""Batch async валидатор для списков."""
def __init__(self, model: type[T]):
self.model = model
async def validate_many(
self,
data_list: list[dict],
max_concurrent: int = 10
) -> list[T]:
"""Валидация списка объектов с ограничением concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
async def validate_one(data: dict) -> T:
async with semaphore:
# Предположим, что модель имеет validate_async
return await self.model.validate_async(data)
# Запускаем все валидации параллельно (с ограничением)
tasks = [validate_one(data) for data in data_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Разделяем успешные и ошибки
valid = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({'index': i, 'error': str(result)})
else:
valid.append(result)
if errors:
raise ValueError(f'Validation failed for {len(errors)} items: {errors}')
return valid
# Использование
@app.post("/orders/batch")
async def create_orders_batch(orders_data: list[dict]):
validator = AsyncValidator(OrderWithAsyncValidation)
# Валидируем все заказы параллельно (макс 10 одновременно)
orders = await validator.validate_many(orders_data, max_concurrent=10)
return {"created": len(orders), "orders": orders}GraphQL + Strawberry: Pydantic в качестве input/output
Базовая интеграция Pydantic + Strawberry
import strawberry
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
# Pydantic модели для бизнес-логики
class UserInput(BaseModel):
"""Input модель с валидацией."""
username: str = Field(min_length=3, max_length=20, pattern=r'^[a-zA-Z0-9_]+$')
email: str = Field(pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
age: int = Field(ge=18, le=120)
class UserModel(BaseModel):
"""Внутренняя модель пользователя."""
id: int
username: str
email: str
age: int
created_at: datetime
# Strawberry типы для GraphQL
@strawberry.experimental.pydantic.input(model=UserInput)
class UserInputType:
"""GraphQL input type из Pydantic."""
username: strawberry.auto
email: strawberry.auto
age: strawberry.auto
@strawberry.experimental.pydantic.type(model=UserModel)
class UserType:
"""GraphQL type из Pydantic."""
id: strawberry.auto
username: strawberry.auto
email: strawberry.auto
age: strawberry.auto
created_at: strawberry.auto
# GraphQL schema
@strawberry.type
class Query:
@strawberry.field
def user(self, id: int) -> Optional[UserType]:
"""Получить пользователя по ID."""
user = db.query(User).get(id)
if not user:
return None
# Pydantic валидация ORM → Pydantic → Strawberry
user_model = UserModel.model_validate(user)
return UserType.from_pydantic(user_model)
@strawberry.type
class Mutation:
@strawberry.mutation
def create_user(self, user_input: UserInputType) -> UserType:
"""Создать пользователя."""
# Strawberry → Pydantic (с валидацией)
user_data = user_input.to_pydantic()
# Бизнес-логика
user = User(**user_data.model_dump())
db.add(user)
db.commit()
# Pydantic → Strawberry
user_model = UserModel.model_validate(user)
return UserType.from_pydantic(user_model)
schema = strawberry.Schema(query=Query, mutation=Mutation)Сложные GraphQL типы с Pydantic валидацией
from pydantic import BaseModel, model_validator
from typing import Literal
import strawberry
from enum import Enum
# Pydantic модели
class AddressInput(BaseModel):
street: str = Field(min_length=5, max_length=200)
city: str = Field(min_length=2, max_length=100)
country: str = Field(min_length=2, max_length=2, pattern=r'^[A-Z]{2}$')
postal_code: str
class PaymentMethodInput(BaseModel):
type: Literal["card", "bank_transfer", "crypto"]
# Conditional fields
card_number: str | None = None
iban: str | None = None
wallet_address: str | None = None
@model_validator(mode='after')
def validate_payment_fields(self):
"""Проверка соответствия полей типу платежа."""
if self.type == "card" and not self.card_number:
raise ValueError("card_number required for card payment")
if self.type == "bank_transfer" and not self.iban:
raise ValueError("iban required for bank transfer")
if self.type == "crypto" and not self.wallet_address:
raise ValueError("wallet_address required for crypto payment")
return self
class OrderInput(BaseModel):
items: list[dict] = Field(min_length=1)
shipping_address: AddressInput
payment_method: PaymentMethodInput
@model_validator(mode='after')
def validate_order_total(self):
"""Валидация общей суммы заказа."""
total = sum(item.get('quantity', 0) * item.get('price', 0) for item in self.items)
if total <= 0:
raise ValueError("Order total must be positive")
return self
# Strawberry types
@strawberry.experimental.pydantic.input(model=AddressInput)
class AddressInputType:
street: strawberry.auto
city: strawberry.auto
country: strawberry.auto
postal_code: strawberry.auto
@strawberry.experimental.pydantic.input(model=PaymentMethodInput)
class PaymentMethodInputType:
type: strawberry.auto
card_number: strawberry.auto
iban: strawberry.auto
wallet_address: strawberry.auto
@strawberry.experimental.pydantic.input(model=OrderInput)
class OrderInputType:
items: strawberry.auto
shipping_address: strawberry.auto
payment_method: strawberry.auto
@strawberry.type
class Mutation:
@strawberry.mutation
def create_order(self, order_input: OrderInputType) -> str:
"""Создать заказ с комплексной валидацией."""
# Pydantic валидация сработает автоматически
order_data = order_input.to_pydantic()
# Все валидаторы уже отработали:
# - Conditional validation (payment method)
# - Cross-field validation (order total)
# - Field constraints (lengths, patterns)
# Бизнес-логика
order = create_order_in_db(order_data)
return f"Order {order.id} created"GraphQL subscriptions с Pydantic
from typing import AsyncGenerator
import strawberry
from pydantic import BaseModel
import asyncio
class OrderUpdate(BaseModel):
"""Модель обновления заказа."""
order_id: int
status: Literal["pending", "processing", "shipped", "delivered"]
timestamp: datetime
message: str
@strawberry.experimental.pydantic.type(model=OrderUpdate)
class OrderUpdateType:
order_id: strawberry.auto
status: strawberry.auto
timestamp: strawberry.auto
message: strawberry.auto
@strawberry.type
class Subscription:
@strawberry.subscription
async def order_updates(
self,
order_id: int
) -> AsyncGenerator[OrderUpdateType, None]:
"""Подписка на обновления заказа."""
# Симуляция real-time обновлений
while True:
# Получаем обновление из Redis/Kafka/WebSocket
update_data = await get_order_update(order_id)
if update_data:
# Pydantic валидация
update = OrderUpdate(**update_data)
# Отправляем клиенту
yield OrderUpdateType.from_pydantic(update)
await asyncio.sleep(1)
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription
)CLI приложения с Typer + Pydantic
Базовая интеграция Typer + Pydantic
import typer
from pydantic import BaseModel, Field, ValidationError
from typing import Optional
from pathlib import Path
app = typer.Typer()
# Pydantic модель для конфигурации CLI
class DeployConfig(BaseModel):
"""Конфигурация deployment."""
environment: Literal["dev", "staging", "production"]
region: str = Field(pattern=r'^[a-z]{2}-[a-z]+-\d+$') # e.g., us-east-1
instances: int = Field(ge=1, le=100)
config_file: Path
@model_validator(mode='after')
def validate_config_file_exists(self):
"""Проверяем существование файла конфигурации."""
if not self.config_file.exists():
raise ValueError(f"Config file not found: {self.config_file}")
return self
@app.command()
def deploy(
environment: str = typer.Option(..., help="Environment: dev/staging/production"),
region: str = typer.Option(..., help="AWS region (e.g., us-east-1)"),
instances: int = typer.Option(1, help="Number of instances"),
config_file: Path = typer.Option(Path("config.yaml"), help="Config file path"),
):
"""Deploy application with validated configuration."""
try:
# Pydantic валидация CLI аргументов
config = DeployConfig(
environment=environment,
region=region,
instances=instances,
config_file=config_file
)
typer.echo(f"✓ Configuration validated")
typer.echo(f" Environment: {config.environment}")
typer.echo(f" Region: {config.region}")
typer.echo(f" Instances: {config.instances}")
# Deployment logic
perform_deployment(config)
typer.echo(typer.style("✓ Deployment successful!", fg=typer.colors.GREEN, bold=True))
except ValidationError as e:
typer.echo(typer.style("✗ Configuration errors:", fg=typer.colors.RED, bold=True))
for error in e.errors():
field = '.'.join(str(loc) for loc in error['loc'])
typer.echo(f" • {field}: {error['msg']}")
raise typer.Exit(code=1)
if __name__ == "__main__":
app()CLI с JSON/YAML конфигурацией через Pydantic
import typer
from pydantic import BaseModel, Field
from pathlib import Path
import yaml
import json
app = typer.Typer()
class DatabaseConfig(BaseModel):
"""Конфигурация БД."""
host: str
port: int = Field(ge=1, le=65535)
database: str
username: str
password: str = Field(min_length=8)
class ServerConfig(BaseModel):
"""Конфигурация сервера."""
host: str = "0.0.0.0"
port: int = Field(default=8000, ge=1, le=65535)
workers: int = Field(default=4, ge=1, le=64)
log_level: Literal["debug", "info", "warning", "error"] = "info"
class AppConfig(BaseModel):
"""Полная конфигурация приложения."""
app_name: str
version: str = Field(pattern=r'^\d+\.\d+\.\d+$')
database: DatabaseConfig
server: ServerConfig
@classmethod
def from_file(cls, file_path: Path) -> "AppConfig":
"""Загрузка конфигурации из YAML/JSON файла."""
with open(file_path) as f:
if file_path.suffix in ['.yaml', '.yml']:
data = yaml.safe_load(f)
elif file_path.suffix == '.json':
data = json.load(f)
else:
raise ValueError(f"Unsupported config format: {file_path.suffix}")
return cls(**data)
@app.command()
def start(
config_file: Path = typer.Option(
Path("config.yaml"),
help="Path to configuration file (YAML or JSON)"
),
validate_only: bool = typer.Option(
False,
"--validate-only",
help="Only validate configuration without starting"
)
):
"""Start application server with configuration validation."""
try:
# Загрузка и валидация конфигурации
config = AppConfig.from_file(config_file)
typer.echo(f"✓ Configuration loaded from {config_file}")
typer.echo(f" App: {config.app_name} v{config.version}")
typer.echo(f" Database: {config.database.username}@{config.database.host}:{config.database.port}")
typer.echo(f" Server: {config.server.host}:{config.server.port} ({config.server.workers} workers)")
if validate_only:
typer.echo(typer.style("✓ Configuration is valid!", fg=typer.colors.GREEN, bold=True))
return
# Запуск сервера
start_server(config)
except ValidationError as e:
typer.echo(typer.style(f"✗ Invalid configuration in {config_file}:", fg=typer.colors.RED, bold=True))
for error in e.errors():
field = '.'.join(str(loc) for loc in error['loc'])
typer.echo(f" • {field}: {error['msg']}")
raise typer.Exit(code=1)
except FileNotFoundError:
typer.echo(typer.style(f"✗ Config file not found: {config_file}", fg=typer.colors.RED))
raise typer.Exit(code=1)
@app.command()
def generate_config(
output: Path = typer.Option(Path("config.yaml"), help="Output file path"),
format: Literal["yaml", "json"] = typer.Option("yaml", help="Output format")
):
"""Generate example configuration file."""
# Создаём пример конфигурации
example_config = AppConfig(
app_name="my-app",
version="1.0.0",
database=DatabaseConfig(
host="localhost",
port=5432,
database="mydb",
username="user",
password="secure_password"
),
server=ServerConfig()
)
# Сохраняем в файл
with open(output, 'w') as f:
if format == "yaml":
yaml.dump(example_config.model_dump(), f, default_flow_style=False)
else:
json.dump(example_config.model_dump(), f, indent=2)
typer.echo(f"✓ Example configuration saved to {output}")
if __name__ == "__main__":
app()Message Brokers: Kafka + RabbitMQ
Kafka с Pydantic схемами
from pydantic import BaseModel, Field
from kafka import KafkaProducer, KafkaConsumer
from datetime import datetime
import json
# Pydantic модели для Kafka messages
class OrderCreatedEvent(BaseModel):
"""Event: новый заказ создан."""
event_type: Literal["order.created"] = "order.created"
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = Field(default_factory=datetime.utcnow)
order_id: int
user_id: int
total_amount: float
items_count: int
class OrderShippedEvent(BaseModel):
"""Event: заказ отправлен."""
event_type: Literal["order.shipped"] = "order.shipped"
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = Field(default_factory=datetime.utcnow)
order_id: int
tracking_number: str
carrier: str
# Producer с валидацией
class KafkaEventProducer:
"""Kafka producer с Pydantic валидацией."""
def __init__(self, bootstrap_servers: list[str]):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def send_event(self, topic: str, event: BaseModel):
"""Отправка события с валидацией."""
# Pydantic сериализация с валидацией
event_data = event.model_dump(mode='json')
# Отправка в Kafka
future = self.producer.send(topic, value=event_data)
# Ждём подтверждения
try:
record_metadata = future.get(timeout=10)
print(f"✓ Event sent to {record_metadata.topic} partition {record_metadata.partition}")
except Exception as e:
print(f"✗ Failed to send event: {e}")
raise
# Consumer с валидацией
class KafkaEventConsumer:
"""Kafka consumer с Pydantic валидацией."""
def __init__(
self,
bootstrap_servers: list[str],
topic: str,
group_id: str,
event_model: type[BaseModel]
):
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
self.event_model = event_model
def consume(self):
"""Consume events с валидацией."""
for message in self.consumer:
try:
# Pydantic валидация
event = self.event_model(**message.value)
# Обработка события
self.handle_event(event)
except ValidationError as e:
print(f"✗ Invalid event received: {e}")
# Отправляем в Dead Letter Queue
self.send_to_dlq(message.value, str(e))
def handle_event(self, event: BaseModel):
"""Override this method to handle events."""
raise NotImplementedError
# Использование
def main():
# Producer
producer = KafkaEventProducer(
bootstrap_servers=['localhost:9092']
)
# Отправляем события
event = OrderCreatedEvent(
order_id=123,
user_id=456,
total_amount=99.99,
items_count=3
)
producer.send_event('orders', event)
# Consumer
class OrderConsumer(KafkaEventConsumer):
def handle_event(self, event: OrderCreatedEvent):
print(f"Processing order {event.order_id} for user {event.user_id}")
# Бизнес-логика
consumer = OrderConsumer(
bootstrap_servers=['localhost:9092'],
topic='orders',
group_id='order-processor',
event_model=OrderCreatedEvent
)
consumer.consume()RabbitMQ с Pydantic
import pika
from pydantic import BaseModel, ValidationError
import json
class NotificationMessage(BaseModel):
"""Сообщение для отправки уведомления."""
user_id: int
notification_type: Literal["email", "sms", "push"]
subject: str = Field(max_length=200)
body: str = Field(max_length=5000)
priority: Literal["low", "normal", "high"] = "normal"
class RabbitMQPublisher:
"""RabbitMQ publisher с Pydantic валидацией."""
def __init__(self, host: str = 'localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
def publish(self, queue: str, message: BaseModel, routing_key: str = ''):
"""Публикация сообщения с валидацией."""
# Объявляем очередь
self.channel.queue_declare(queue=queue, durable=True)
# Pydantic сериализация
message_body = message.model_dump_json()
# Публикация
self.channel.basic_publish(
exchange='',
routing_key=queue,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
print(f"✓ Message published to {queue}")
def close(self):
self.connection.close()
class RabbitMQConsumer:
"""RabbitMQ consumer с Pydantic валидацией."""
def __init__(self, host: str, queue: str, message_model: type[BaseModel]):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
self.queue = queue
self.message_model = message_model
# Объявляем очередь
self.channel.queue_declare(queue=queue, durable=True)
def start_consuming(self):
"""Начать consume сообщений."""
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=self.queue,
on_message_callback=self._on_message
)
print(f"Waiting for messages in {self.queue}...")
self.channel.start_consuming()
def _on_message(self, ch, method, properties, body):
"""Callback для обработки сообщения."""
try:
# Десериализация
data = json.loads(body)
# Pydantic валидация
message = self.message_model(**data)
# Обработка
self.handle_message(message)
# Подтверждение обработки
ch.basic_ack(delivery_tag=method.delivery_tag)
except ValidationError as e:
print(f"✗ Invalid message: {e}")
# Reject + отправка в DLQ
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f"✗ Processing error: {e}")
# Requeue для повторной обработки
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def handle_message(self, message: BaseModel):
"""Override this method to handle messages."""
raise NotImplementedError
# Использование
def main():
# Publisher
publisher = RabbitMQPublisher(host='localhost')
notification = NotificationMessage(
user_id=123,
notification_type="email",
subject="Order confirmation",
body="Your order #456 has been confirmed.",
priority="high"
)
publisher.publish('notifications', notification)
publisher.close()
# Consumer
class NotificationConsumer(RabbitMQConsumer):
def handle_message(self, message: NotificationMessage):
print(f"Sending {message.notification_type} to user {message.user_id}")
# Отправка уведомления
consumer = NotificationConsumer(
host='localhost',
queue='notifications',
message_model=NotificationMessage
)
consumer.start_consuming()Custom Plugins: расширение Pydantic
Custom validator plugin
from pydantic import BaseModel, GetCoreSchemaHandler
from pydantic_core import core_schema
from typing import Any
class StrictPhoneNumber(str):
"""Кастомный тип для телефонных номеров с валидацией."""
@classmethod
def __get_pydantic_core_schema__(
cls,
source_type: Any,
handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
"""Определяем схему валидации для Pydantic."""
return core_schema.with_info_after_validator_function(
cls._validate,
core_schema.str_schema(),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
)
)
@classmethod
def _validate(cls, value: str, _info) -> "StrictPhoneNumber":
"""Валидация телефонного номера."""
import re
# Убираем все нецифровые символы
digits = re.sub(r'\D', '', value)
# Проверка длины
if len(digits) < 10 or len(digits) > 15:
raise ValueError('Phone number must be 10-15 digits')
# Форматирование
if len(digits) == 11 and digits.startswith('7'):
# Российский номер
formatted = f"+7 ({digits[1:4]}) {digits[4:7]}-{digits[7:9]}-{digits[9:]}"
elif len(digits) == 10:
# Номер без кода страны
formatted = f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
else:
formatted = f"+{digits}"
return cls(formatted)
class User(BaseModel):
username: str
phone: StrictPhoneNumber
# Использование
user = User(username="alice", phone="8-900-123-45-67")
print(user.phone) # +7 (900) 123-45-67Custom serialization plugin
from pydantic import BaseModel
from typing import Any
from decimal import Decimal
class MoneyField(Decimal):
"""Кастомное поле для денег с форматированием."""
@classmethod
def __get_pydantic_core_schema__(cls, source_type, handler):
return core_schema.with_info_before_validator_function(
cls._parse,
core_schema.decimal_schema(),
serialization=core_schema.plain_serializer_function_ser_schema(
cls._serialize
)
)
@classmethod
def _parse(cls, value: Any, _info) -> Decimal:
"""Парсинг различных форматов."""
if isinstance(value, str):
# Убираем символы валюты и пробелы
cleaned = value.replace('$', '').replace('€', '').replace(' ', '').replace(',', '')
return Decimal(cleaned)
return Decimal(value)
@classmethod
def _serialize(cls, value: Decimal) -> str:
"""Сериализация с форматированием."""
# Форматируем с 2 знаками и разделителями тысяч
return f"${value:,.2f}"
class Product(BaseModel):
name: str
price: MoneyField
# Использование
product = Product(name="Laptop", price="1234.56")
print(product.model_dump())
# {'name': 'Laptop', 'price': '$1,234.56'}🎯 Pydantic Ecosystem Map
Визуализация всех интеграций Pydantic v2:
Заключение серии
В пяти частях мы разобрали Pydantic v2 от миграции до advanced topics:
- Часть 1: Миграция v1→v2 — процесс, инциденты, метрики
- Часть 2: Production patterns — ConfigDict, валидация, сериализация, error handling, security
- Часть 3: Производительность — профилирование, benchmarks, оптимизации
- Часть 4: Микросервисы — версионирование, schema registry, FastAPI, SQLAlchemy
- Часть 5 (эта статья): Advanced topics — async validation, GraphQL, CLI, message brokers, custom plugins
Ключевые выводы:
- Pydantic v2 — универсальный инструмент: от REST API до GraphQL, от CLI до message brokers
- Async validation через dependencies: не блокируем event loop
- GraphQL + Strawberry: Pydantic как единый источник истины для схем
- CLI с Typer: валидация конфигурации из коробки
- Message brokers: type-safe events с Pydantic
- Custom plugins: расширяемость для специфичных кейсов
Дальнейшее чтение:
Спасибо за чтение серии! Вопросы и feedback — в комментариях.