Перейти к содержимому

Pydantic v2 Advanced Topics: async валидация, GraphQL, CLI и message brokers

Константин Потапов
28 min

Продвинутые сценарии использования Pydantic v2: async validation, интеграция с GraphQL/Strawberry, CLI приложения с Typer, message brokers (Kafka, RabbitMQ) и custom plugins

Это пятая, заключительная статья серии о Pydantic v2. Часть 1 — миграция, часть 2 — patterns, часть 3 — производительность, часть 4 — микросервисы. Здесь — продвинутые темы и интеграции, которые выходят за рамки типичного REST API.


📚 Серия статей: Pydantic v2 в Production


⚡ TL;DR (для торопящихся)

  • Async validation: Dependency injection (recommended) vs async wrapper vs batch validation с semaphore
  • GraphQL + Strawberry: Pydantic как единый источник истины для схем, автоматическая конвертация через from_pydantic/to_pydantic
  • CLI с Typer: Валидация аргументов, YAML/JSON конфигов, generate-config команды
  • Message brokers: Type-safe Kafka events, RabbitMQ с Pydantic, DLQ для невалидных сообщений
  • Custom plugins: Кастомные типы через __get_pydantic_core_schema__, расширение валидации и сериализации
  • Главный урок: Pydantic универсален — от REST API до GraphQL, от CLI до event-driven архитектуры

Для кого: Full-stack разработчиков, тех кто использует GraphQL/gRPC/message brokers, команд с нестандартными требованиями.


Async Validation: I/O операции без блокировки

Проблема: синхронные валидаторы блокируют event loop

from pydantic import BaseModel, field_validator
import httpx
 
# ❌ Плохо: синхронный HTTP запрос в валидаторе
class PromoCode(BaseModel):
    code: str
 
    @field_validator('code')
    @classmethod
    def validate_code(cls, v: str) -> str:
        # Блокирует event loop на время HTTP запроса!
        response = httpx.get(f"http://promo-service/validate/{v}")
        if response.status_code != 200:
            raise ValueError(f'Invalid promo code: {v}')
        return v
 
# В async endpoint это катастрофа:
@app.post("/orders")
async def create_order(order_data: dict):
    # Валидация заблокирует весь event loop
    order = PromoCode(**order_data)  # ← блокировка!
    return order

Решение 1: Dependency Injection (рекомендуется)

from fastapi import Depends, HTTPException
import httpx
 
class PromoCode(BaseModel):
    """Модель без I/O в валидаторе."""
    code: str
 
async def validate_promo_code_async(
    order: PromoCode,
    http_client: httpx.AsyncClient = Depends(get_http_client)
) -> PromoCode:
    """Async dependency для валидации промокода."""
    try:
        response = await http_client.get(
            f"http://promo-service/validate/{order.code}",
            timeout=2.0
        )
 
        if response.status_code != 200:
            raise HTTPException(
                status_code=400,
                detail=f"Invalid promo code: {order.code}"
            )
 
        # Можем добавить данные из response в order
        promo_data = response.json()
        order.discount_percent = promo_data.get('discount', 0)
 
        return order
 
    except httpx.TimeoutException:
        raise HTTPException(
            status_code=504,
            detail="Promo service timeout"
        )
 
@app.post("/orders")
async def create_order(
    order: PromoCode = Depends(validate_promo_code_async)
):
    # order уже провалидирован асинхронно
    return order

Решение 2: Custom validator с async wrapper

from pydantic import BaseModel, model_validator
from typing import Any
import asyncio
 
class OrderWithAsyncValidation(BaseModel):
    """Модель с async валидацией через wrapper."""
    product_id: str
    quantity: int
    promo_code: str | None = None
 
    _validated_async: bool = False
 
    @model_validator(mode='after')
    def ensure_async_validated(self):
        """Проверяем, что async валидация прошла."""
        if not self._validated_async:
            raise ValueError(
                'Model must be validated with validate_async() method'
            )
        return self
 
    @classmethod
    async def validate_async(cls, data: dict) -> "OrderWithAsyncValidation":
        """Async конструктор с валидацией."""
        # Сначала создаём без async валидации
        obj = cls.model_construct(**data)
 
        # Async валидация
        if obj.promo_code:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"http://promo-service/validate/{obj.promo_code}"
                )
 
                if response.status_code != 200:
                    raise ValueError(f'Invalid promo code: {obj.promo_code}')
 
        # Проверяем наличие продукта
        if obj.product_id:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"http://catalog-service/products/{obj.product_id}"
                )
 
                if response.status_code != 200:
                    raise ValueError(f'Product not found: {obj.product_id}')
 
        # Помечаем как провалидированный
        obj._validated_async = True
 
        return obj
 
# Использование
@app.post("/orders")
async def create_order(order_data: dict):
    # Async валидация
    order = await OrderWithAsyncValidation.validate_async(order_data)
    return order

Решение 3: Batch async validation

from pydantic import BaseModel
from typing import TypeVar, Generic
import asyncio
 
T = TypeVar('T', bound=BaseModel)
 
class AsyncValidator(Generic[T]):
    """Batch async валидатор для списков."""
 
    def __init__(self, model: type[T]):
        self.model = model
 
    async def validate_many(
        self,
        data_list: list[dict],
        max_concurrent: int = 10
    ) -> list[T]:
        """Валидация списка объектов с ограничением concurrency."""
 
        semaphore = asyncio.Semaphore(max_concurrent)
 
        async def validate_one(data: dict) -> T:
            async with semaphore:
                # Предположим, что модель имеет validate_async
                return await self.model.validate_async(data)
 
        # Запускаем все валидации параллельно (с ограничением)
        tasks = [validate_one(data) for data in data_list]
        results = await asyncio.gather(*tasks, return_exceptions=True)
 
        # Разделяем успешные и ошибки
        valid = []
        errors = []
 
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                errors.append({'index': i, 'error': str(result)})
            else:
                valid.append(result)
 
        if errors:
            raise ValueError(f'Validation failed for {len(errors)} items: {errors}')
 
        return valid
 
# Использование
@app.post("/orders/batch")
async def create_orders_batch(orders_data: list[dict]):
    validator = AsyncValidator(OrderWithAsyncValidation)
 
    # Валидируем все заказы параллельно (макс 10 одновременно)
    orders = await validator.validate_many(orders_data, max_concurrent=10)
 
    return {"created": len(orders), "orders": orders}

GraphQL + Strawberry: Pydantic в качестве input/output

Базовая интеграция Pydantic + Strawberry

import strawberry
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
 
# Pydantic модели для бизнес-логики
class UserInput(BaseModel):
    """Input модель с валидацией."""
    username: str = Field(min_length=3, max_length=20, pattern=r'^[a-zA-Z0-9_]+$')
    email: str = Field(pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    age: int = Field(ge=18, le=120)
 
class UserModel(BaseModel):
    """Внутренняя модель пользователя."""
    id: int
    username: str
    email: str
    age: int
    created_at: datetime
 
# Strawberry типы для GraphQL
@strawberry.experimental.pydantic.input(model=UserInput)
class UserInputType:
    """GraphQL input type из Pydantic."""
    username: strawberry.auto
    email: strawberry.auto
    age: strawberry.auto
 
@strawberry.experimental.pydantic.type(model=UserModel)
class UserType:
    """GraphQL type из Pydantic."""
    id: strawberry.auto
    username: strawberry.auto
    email: strawberry.auto
    age: strawberry.auto
    created_at: strawberry.auto
 
# GraphQL schema
@strawberry.type
class Query:
    @strawberry.field
    def user(self, id: int) -> Optional[UserType]:
        """Получить пользователя по ID."""
        user = db.query(User).get(id)
        if not user:
            return None
 
        # Pydantic валидация ORM → Pydantic → Strawberry
        user_model = UserModel.model_validate(user)
        return UserType.from_pydantic(user_model)
 
@strawberry.type
class Mutation:
    @strawberry.mutation
    def create_user(self, user_input: UserInputType) -> UserType:
        """Создать пользователя."""
        # Strawberry → Pydantic (с валидацией)
        user_data = user_input.to_pydantic()
 
        # Бизнес-логика
        user = User(**user_data.model_dump())
        db.add(user)
        db.commit()
 
        # Pydantic → Strawberry
        user_model = UserModel.model_validate(user)
        return UserType.from_pydantic(user_model)
 
schema = strawberry.Schema(query=Query, mutation=Mutation)

Сложные GraphQL типы с Pydantic валидацией

from pydantic import BaseModel, model_validator
from typing import Literal
import strawberry
from enum import Enum
 
# Pydantic модели
class AddressInput(BaseModel):
    street: str = Field(min_length=5, max_length=200)
    city: str = Field(min_length=2, max_length=100)
    country: str = Field(min_length=2, max_length=2, pattern=r'^[A-Z]{2}$')
    postal_code: str
 
class PaymentMethodInput(BaseModel):
    type: Literal["card", "bank_transfer", "crypto"]
 
    # Conditional fields
    card_number: str | None = None
    iban: str | None = None
    wallet_address: str | None = None
 
    @model_validator(mode='after')
    def validate_payment_fields(self):
        """Проверка соответствия полей типу платежа."""
        if self.type == "card" and not self.card_number:
            raise ValueError("card_number required for card payment")
        if self.type == "bank_transfer" and not self.iban:
            raise ValueError("iban required for bank transfer")
        if self.type == "crypto" and not self.wallet_address:
            raise ValueError("wallet_address required for crypto payment")
        return self
 
class OrderInput(BaseModel):
    items: list[dict] = Field(min_length=1)
    shipping_address: AddressInput
    payment_method: PaymentMethodInput
 
    @model_validator(mode='after')
    def validate_order_total(self):
        """Валидация общей суммы заказа."""
        total = sum(item.get('quantity', 0) * item.get('price', 0) for item in self.items)
        if total <= 0:
            raise ValueError("Order total must be positive")
        return self
 
# Strawberry types
@strawberry.experimental.pydantic.input(model=AddressInput)
class AddressInputType:
    street: strawberry.auto
    city: strawberry.auto
    country: strawberry.auto
    postal_code: strawberry.auto
 
@strawberry.experimental.pydantic.input(model=PaymentMethodInput)
class PaymentMethodInputType:
    type: strawberry.auto
    card_number: strawberry.auto
    iban: strawberry.auto
    wallet_address: strawberry.auto
 
@strawberry.experimental.pydantic.input(model=OrderInput)
class OrderInputType:
    items: strawberry.auto
    shipping_address: strawberry.auto
    payment_method: strawberry.auto
 
@strawberry.type
class Mutation:
    @strawberry.mutation
    def create_order(self, order_input: OrderInputType) -> str:
        """Создать заказ с комплексной валидацией."""
        # Pydantic валидация сработает автоматически
        order_data = order_input.to_pydantic()
 
        # Все валидаторы уже отработали:
        # - Conditional validation (payment method)
        # - Cross-field validation (order total)
        # - Field constraints (lengths, patterns)
 
        # Бизнес-логика
        order = create_order_in_db(order_data)
 
        return f"Order {order.id} created"

GraphQL subscriptions с Pydantic

from typing import AsyncGenerator
import strawberry
from pydantic import BaseModel
import asyncio
 
class OrderUpdate(BaseModel):
    """Модель обновления заказа."""
    order_id: int
    status: Literal["pending", "processing", "shipped", "delivered"]
    timestamp: datetime
    message: str
 
@strawberry.experimental.pydantic.type(model=OrderUpdate)
class OrderUpdateType:
    order_id: strawberry.auto
    status: strawberry.auto
    timestamp: strawberry.auto
    message: strawberry.auto
 
@strawberry.type
class Subscription:
    @strawberry.subscription
    async def order_updates(
        self,
        order_id: int
    ) -> AsyncGenerator[OrderUpdateType, None]:
        """Подписка на обновления заказа."""
 
        # Симуляция real-time обновлений
        while True:
            # Получаем обновление из Redis/Kafka/WebSocket
            update_data = await get_order_update(order_id)
 
            if update_data:
                # Pydantic валидация
                update = OrderUpdate(**update_data)
 
                # Отправляем клиенту
                yield OrderUpdateType.from_pydantic(update)
 
            await asyncio.sleep(1)
 
schema = strawberry.Schema(
    query=Query,
    mutation=Mutation,
    subscription=Subscription
)

CLI приложения с Typer + Pydantic

Базовая интеграция Typer + Pydantic

import typer
from pydantic import BaseModel, Field, ValidationError
from typing import Optional
from pathlib import Path
 
app = typer.Typer()
 
# Pydantic модель для конфигурации CLI
class DeployConfig(BaseModel):
    """Конфигурация deployment."""
    environment: Literal["dev", "staging", "production"]
    region: str = Field(pattern=r'^[a-z]{2}-[a-z]+-\d+$')  # e.g., us-east-1
    instances: int = Field(ge=1, le=100)
    config_file: Path
 
    @model_validator(mode='after')
    def validate_config_file_exists(self):
        """Проверяем существование файла конфигурации."""
        if not self.config_file.exists():
            raise ValueError(f"Config file not found: {self.config_file}")
        return self
 
@app.command()
def deploy(
    environment: str = typer.Option(..., help="Environment: dev/staging/production"),
    region: str = typer.Option(..., help="AWS region (e.g., us-east-1)"),
    instances: int = typer.Option(1, help="Number of instances"),
    config_file: Path = typer.Option(Path("config.yaml"), help="Config file path"),
):
    """Deploy application with validated configuration."""
 
    try:
        # Pydantic валидация CLI аргументов
        config = DeployConfig(
            environment=environment,
            region=region,
            instances=instances,
            config_file=config_file
        )
 
        typer.echo(f"✓ Configuration validated")
        typer.echo(f"  Environment: {config.environment}")
        typer.echo(f"  Region: {config.region}")
        typer.echo(f"  Instances: {config.instances}")
 
        # Deployment logic
        perform_deployment(config)
 
        typer.echo(typer.style("✓ Deployment successful!", fg=typer.colors.GREEN, bold=True))
 
    except ValidationError as e:
        typer.echo(typer.style("✗ Configuration errors:", fg=typer.colors.RED, bold=True))
        for error in e.errors():
            field = '.'.join(str(loc) for loc in error['loc'])
            typer.echo(f"  • {field}: {error['msg']}")
        raise typer.Exit(code=1)
 
if __name__ == "__main__":
    app()

CLI с JSON/YAML конфигурацией через Pydantic

import typer
from pydantic import BaseModel, Field
from pathlib import Path
import yaml
import json
 
app = typer.Typer()
 
class DatabaseConfig(BaseModel):
    """Конфигурация БД."""
    host: str
    port: int = Field(ge=1, le=65535)
    database: str
    username: str
    password: str = Field(min_length=8)
 
class ServerConfig(BaseModel):
    """Конфигурация сервера."""
    host: str = "0.0.0.0"
    port: int = Field(default=8000, ge=1, le=65535)
    workers: int = Field(default=4, ge=1, le=64)
    log_level: Literal["debug", "info", "warning", "error"] = "info"
 
class AppConfig(BaseModel):
    """Полная конфигурация приложения."""
    app_name: str
    version: str = Field(pattern=r'^\d+\.\d+\.\d+$')
    database: DatabaseConfig
    server: ServerConfig
 
    @classmethod
    def from_file(cls, file_path: Path) -> "AppConfig":
        """Загрузка конфигурации из YAML/JSON файла."""
        with open(file_path) as f:
            if file_path.suffix in ['.yaml', '.yml']:
                data = yaml.safe_load(f)
            elif file_path.suffix == '.json':
                data = json.load(f)
            else:
                raise ValueError(f"Unsupported config format: {file_path.suffix}")
 
        return cls(**data)
 
@app.command()
def start(
    config_file: Path = typer.Option(
        Path("config.yaml"),
        help="Path to configuration file (YAML or JSON)"
    ),
    validate_only: bool = typer.Option(
        False,
        "--validate-only",
        help="Only validate configuration without starting"
    )
):
    """Start application server with configuration validation."""
 
    try:
        # Загрузка и валидация конфигурации
        config = AppConfig.from_file(config_file)
 
        typer.echo(f"✓ Configuration loaded from {config_file}")
        typer.echo(f"  App: {config.app_name} v{config.version}")
        typer.echo(f"  Database: {config.database.username}@{config.database.host}:{config.database.port}")
        typer.echo(f"  Server: {config.server.host}:{config.server.port} ({config.server.workers} workers)")
 
        if validate_only:
            typer.echo(typer.style("✓ Configuration is valid!", fg=typer.colors.GREEN, bold=True))
            return
 
        # Запуск сервера
        start_server(config)
 
    except ValidationError as e:
        typer.echo(typer.style(f"✗ Invalid configuration in {config_file}:", fg=typer.colors.RED, bold=True))
        for error in e.errors():
            field = '.'.join(str(loc) for loc in error['loc'])
            typer.echo(f"  • {field}: {error['msg']}")
        raise typer.Exit(code=1)
    except FileNotFoundError:
        typer.echo(typer.style(f"✗ Config file not found: {config_file}", fg=typer.colors.RED))
        raise typer.Exit(code=1)
 
@app.command()
def generate_config(
    output: Path = typer.Option(Path("config.yaml"), help="Output file path"),
    format: Literal["yaml", "json"] = typer.Option("yaml", help="Output format")
):
    """Generate example configuration file."""
 
    # Создаём пример конфигурации
    example_config = AppConfig(
        app_name="my-app",
        version="1.0.0",
        database=DatabaseConfig(
            host="localhost",
            port=5432,
            database="mydb",
            username="user",
            password="secure_password"
        ),
        server=ServerConfig()
    )
 
    # Сохраняем в файл
    with open(output, 'w') as f:
        if format == "yaml":
            yaml.dump(example_config.model_dump(), f, default_flow_style=False)
        else:
            json.dump(example_config.model_dump(), f, indent=2)
 
    typer.echo(f"✓ Example configuration saved to {output}")
 
if __name__ == "__main__":
    app()

Message Brokers: Kafka + RabbitMQ

Kafka с Pydantic схемами

from pydantic import BaseModel, Field
from kafka import KafkaProducer, KafkaConsumer
from datetime import datetime
import json
 
# Pydantic модели для Kafka messages
class OrderCreatedEvent(BaseModel):
    """Event: новый заказ создан."""
    event_type: Literal["order.created"] = "order.created"
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = Field(default_factory=datetime.utcnow)
 
    order_id: int
    user_id: int
    total_amount: float
    items_count: int
 
class OrderShippedEvent(BaseModel):
    """Event: заказ отправлен."""
    event_type: Literal["order.shipped"] = "order.shipped"
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = Field(default_factory=datetime.utcnow)
 
    order_id: int
    tracking_number: str
    carrier: str
 
# Producer с валидацией
class KafkaEventProducer:
    """Kafka producer с Pydantic валидацией."""
 
    def __init__(self, bootstrap_servers: list[str]):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
 
    def send_event(self, topic: str, event: BaseModel):
        """Отправка события с валидацией."""
        # Pydantic сериализация с валидацией
        event_data = event.model_dump(mode='json')
 
        # Отправка в Kafka
        future = self.producer.send(topic, value=event_data)
 
        # Ждём подтверждения
        try:
            record_metadata = future.get(timeout=10)
            print(f"✓ Event sent to {record_metadata.topic} partition {record_metadata.partition}")
        except Exception as e:
            print(f"✗ Failed to send event: {e}")
            raise
 
# Consumer с валидацией
class KafkaEventConsumer:
    """Kafka consumer с Pydantic валидацией."""
 
    def __init__(
        self,
        bootstrap_servers: list[str],
        topic: str,
        group_id: str,
        event_model: type[BaseModel]
    ):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode('utf-8'))
        )
        self.event_model = event_model
 
    def consume(self):
        """Consume events с валидацией."""
        for message in self.consumer:
            try:
                # Pydantic валидация
                event = self.event_model(**message.value)
 
                # Обработка события
                self.handle_event(event)
 
            except ValidationError as e:
                print(f"✗ Invalid event received: {e}")
                # Отправляем в Dead Letter Queue
                self.send_to_dlq(message.value, str(e))
 
    def handle_event(self, event: BaseModel):
        """Override this method to handle events."""
        raise NotImplementedError
 
# Использование
def main():
    # Producer
    producer = KafkaEventProducer(
        bootstrap_servers=['localhost:9092']
    )
 
    # Отправляем события
    event = OrderCreatedEvent(
        order_id=123,
        user_id=456,
        total_amount=99.99,
        items_count=3
    )
    producer.send_event('orders', event)
 
    # Consumer
    class OrderConsumer(KafkaEventConsumer):
        def handle_event(self, event: OrderCreatedEvent):
            print(f"Processing order {event.order_id} for user {event.user_id}")
            # Бизнес-логика
 
    consumer = OrderConsumer(
        bootstrap_servers=['localhost:9092'],
        topic='orders',
        group_id='order-processor',
        event_model=OrderCreatedEvent
    )
    consumer.consume()

RabbitMQ с Pydantic

import pika
from pydantic import BaseModel, ValidationError
import json
 
class NotificationMessage(BaseModel):
    """Сообщение для отправки уведомления."""
    user_id: int
    notification_type: Literal["email", "sms", "push"]
    subject: str = Field(max_length=200)
    body: str = Field(max_length=5000)
    priority: Literal["low", "normal", "high"] = "normal"
 
class RabbitMQPublisher:
    """RabbitMQ publisher с Pydantic валидацией."""
 
    def __init__(self, host: str = 'localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
 
    def publish(self, queue: str, message: BaseModel, routing_key: str = ''):
        """Публикация сообщения с валидацией."""
        # Объявляем очередь
        self.channel.queue_declare(queue=queue, durable=True)
 
        # Pydantic сериализация
        message_body = message.model_dump_json()
 
        # Публикация
        self.channel.basic_publish(
            exchange='',
            routing_key=queue,
            body=message_body,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
                content_type='application/json'
            )
        )
 
        print(f"✓ Message published to {queue}")
 
    def close(self):
        self.connection.close()
 
class RabbitMQConsumer:
    """RabbitMQ consumer с Pydantic валидацией."""
 
    def __init__(self, host: str, queue: str, message_model: type[BaseModel]):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
        self.queue = queue
        self.message_model = message_model
 
        # Объявляем очередь
        self.channel.queue_declare(queue=queue, durable=True)
 
    def start_consuming(self):
        """Начать consume сообщений."""
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=self.queue,
            on_message_callback=self._on_message
        )
 
        print(f"Waiting for messages in {self.queue}...")
        self.channel.start_consuming()
 
    def _on_message(self, ch, method, properties, body):
        """Callback для обработки сообщения."""
        try:
            # Десериализация
            data = json.loads(body)
 
            # Pydantic валидация
            message = self.message_model(**data)
 
            # Обработка
            self.handle_message(message)
 
            # Подтверждение обработки
            ch.basic_ack(delivery_tag=method.delivery_tag)
 
        except ValidationError as e:
            print(f"✗ Invalid message: {e}")
            # Reject + отправка в DLQ
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        except Exception as e:
            print(f"✗ Processing error: {e}")
            # Requeue для повторной обработки
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
 
    def handle_message(self, message: BaseModel):
        """Override this method to handle messages."""
        raise NotImplementedError
 
# Использование
def main():
    # Publisher
    publisher = RabbitMQPublisher(host='localhost')
 
    notification = NotificationMessage(
        user_id=123,
        notification_type="email",
        subject="Order confirmation",
        body="Your order #456 has been confirmed.",
        priority="high"
    )
 
    publisher.publish('notifications', notification)
    publisher.close()
 
    # Consumer
    class NotificationConsumer(RabbitMQConsumer):
        def handle_message(self, message: NotificationMessage):
            print(f"Sending {message.notification_type} to user {message.user_id}")
            # Отправка уведомления
 
    consumer = NotificationConsumer(
        host='localhost',
        queue='notifications',
        message_model=NotificationMessage
    )
    consumer.start_consuming()

Custom Plugins: расширение Pydantic

Custom validator plugin

from pydantic import BaseModel, GetCoreSchemaHandler
from pydantic_core import core_schema
from typing import Any
 
class StrictPhoneNumber(str):
    """Кастомный тип для телефонных номеров с валидацией."""
 
    @classmethod
    def __get_pydantic_core_schema__(
        cls,
        source_type: Any,
        handler: GetCoreSchemaHandler
    ) -> core_schema.CoreSchema:
        """Определяем схему валидации для Pydantic."""
 
        return core_schema.with_info_after_validator_function(
            cls._validate,
            core_schema.str_schema(),
            serialization=core_schema.plain_serializer_function_ser_schema(
                lambda x: str(x)
            )
        )
 
    @classmethod
    def _validate(cls, value: str, _info) -> "StrictPhoneNumber":
        """Валидация телефонного номера."""
        import re
 
        # Убираем все нецифровые символы
        digits = re.sub(r'\D', '', value)
 
        # Проверка длины
        if len(digits) < 10 or len(digits) > 15:
            raise ValueError('Phone number must be 10-15 digits')
 
        # Форматирование
        if len(digits) == 11 and digits.startswith('7'):
            # Российский номер
            formatted = f"+7 ({digits[1:4]}) {digits[4:7]}-{digits[7:9]}-{digits[9:]}"
        elif len(digits) == 10:
            # Номер без кода страны
            formatted = f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
        else:
            formatted = f"+{digits}"
 
        return cls(formatted)
 
class User(BaseModel):
    username: str
    phone: StrictPhoneNumber
 
# Использование
user = User(username="alice", phone="8-900-123-45-67")
print(user.phone)  # +7 (900) 123-45-67

Custom serialization plugin

from pydantic import BaseModel
from typing import Any
from decimal import Decimal
 
class MoneyField(Decimal):
    """Кастомное поле для денег с форматированием."""
 
    @classmethod
    def __get_pydantic_core_schema__(cls, source_type, handler):
        return core_schema.with_info_before_validator_function(
            cls._parse,
            core_schema.decimal_schema(),
            serialization=core_schema.plain_serializer_function_ser_schema(
                cls._serialize
            )
        )
 
    @classmethod
    def _parse(cls, value: Any, _info) -> Decimal:
        """Парсинг различных форматов."""
        if isinstance(value, str):
            # Убираем символы валюты и пробелы
            cleaned = value.replace('$', '').replace('', '').replace(' ', '').replace(',', '')
            return Decimal(cleaned)
        return Decimal(value)
 
    @classmethod
    def _serialize(cls, value: Decimal) -> str:
        """Сериализация с форматированием."""
        # Форматируем с 2 знаками и разделителями тысяч
        return f"${value:,.2f}"
 
class Product(BaseModel):
    name: str
    price: MoneyField
 
# Использование
product = Product(name="Laptop", price="1234.56")
print(product.model_dump())
# {'name': 'Laptop', 'price': '$1,234.56'}

🎯 Pydantic Ecosystem Map

Визуализация всех интеграций Pydantic v2:


Заключение серии

В пяти частях мы разобрали Pydantic v2 от миграции до advanced topics:

  • Часть 1: Миграция v1→v2 — процесс, инциденты, метрики
  • Часть 2: Production patterns — ConfigDict, валидация, сериализация, error handling, security
  • Часть 3: Производительность — профилирование, benchmarks, оптимизации
  • Часть 4: Микросервисы — версионирование, schema registry, FastAPI, SQLAlchemy
  • Часть 5 (эта статья): Advanced topics — async validation, GraphQL, CLI, message brokers, custom plugins

Ключевые выводы:

  1. Pydantic v2 — универсальный инструмент: от REST API до GraphQL, от CLI до message brokers
  2. Async validation через dependencies: не блокируем event loop
  3. GraphQL + Strawberry: Pydantic как единый источник истины для схем
  4. CLI с Typer: валидация конфигурации из коробки
  5. Message brokers: type-safe events с Pydantic
  6. Custom plugins: расширяемость для специфичных кейсов

Дальнейшее чтение:

Спасибо за чтение серии! Вопросы и feedback — в комментариях.