Перейти к содержимому

Event-Driven Architecture: от теории к production

Константин Потапов
25 мин

Когда синхронные вызовы душат систему, как внедрить события без боли, Kafka vs RabbitMQ на практике и почему eventual consistency — это осознанный выбор. Типичные production-кейсы с метриками и разбор всех граблей EDA.

Event-Driven Architecture: от теории к production

"Мы добавили в систему real-time уведомления. Теперь при создании заказа делается 7 синхронных HTTP-запросов, и если хотя бы один сервис тормозит — весь checkout падает. Users жалуются, CEO требует голов."

Типичная ситуация в растущих стартапах (подобные истории регулярно появляются в блогах Stripe, Airbnb, финтех-компаний). Классическая синхронная архитектура с красивым фасадом превращается в карточный домик: один медленный сервис валит всю систему.

Результат после внедрения event-driven подхода: P99 latency checkout падает с 2.3 секунд до 180мс. Availability растет с 99.5% до 99.95%. Команда перестает получать алерты в 3 ночи (по опыту миграций в Netflix, Uber, других компаний).

В этой статье — практический гайд по Event-Driven Architecture без евангелизма и магического мышления. Только типичные production-кейсы, код на Python, метрики и честные грабли, которые регулярно встречаются в индустрии.

Правда об Event-Driven Architecture: это не про Kafka

Начну с неудобной правды: Event-Driven Architecture — это не технология. Это подход к проектированию систем.

Представьте две модели общения:

Request-Response (синхронная модель) — как телефонный звонок:

Вы: "Алло, какая погода?"
Друг: "Подожди... (ищет информацию) ...+15°C"
Вы: (ждёте всё это время с трубкой у уха)

Event-Driven (асинхронная модель) — как новостная лента:

Вы: (публикуете событие "Хочу знать погоду")
Погодный сервис: (видит событие и публикует "+15°C" в ленту)
Вы: (увидите ответ, когда будете готовы)

Главное отличие: в первом случае вы ждёте, во втором — продолжаете жить.

Типичный сценарий: как синхронные вызовы убивают checkout

E-commerce стартап решает "улучшить UX" и добавляет новые фичи при создании заказа:

Что должно произойти при checkout:

  1. Проверить user в Auth Service (50ms)
  2. Валидировать товары в Catalog Service (80ms)
  3. Проверить промокод в Discount Service (120ms)
  4. Резервировать товар на складе в Inventory Service (200ms)
  5. Создать заказ в Order Service (100ms)
  6. Списать деньги в Payment Service (300ms)
  7. Отправить email через Notification Service (150ms)

Синхронная реализация:

# ❌ Так мы сделали (и поплатились)
from fastapi import FastAPI, HTTPException
import httpx
 
app = FastAPI()
 
@app.post("/checkout")
async def checkout(order: CheckoutRequest):
    async with httpx.AsyncClient(timeout=5.0) as client:
        # 1. Проверяем пользователя
        user = await client.get(f"http://auth-service/users/{order.user_id}")
        if user.status_code != 200:
            raise HTTPException(401, "User not found")
 
        # 2. Валидируем товары
        catalog = await client.post(
            "http://catalog-service/validate",
            json={"items": order.items}
        )
        if catalog.status_code != 200:
            raise HTTPException(400, "Invalid items")
 
        # 3. Проверяем промокод
        discount = await client.post(
            "http://discount-service/validate",
            json={"code": order.promo_code}
        )
 
        # 4. Резервируем на складе
        inventory = await client.post(
            "http://inventory-service/reserve",
            json={"items": order.items}
        )
        if inventory.status_code != 200:
            raise HTTPException(400, "Out of stock")
 
        # 5. Создаём заказ
        order_response = await client.post(
            "http://order-service/orders",
            json=order.dict()
        )
 
        # 6. Списываем деньги
        payment = await client.post(
            "http://payment-service/charge",
            json={"amount": order.total, "user_id": order.user_id}
        )
        if payment.status_code != 200:
            # О нет! Заказ создан, но оплата не прошла!
            # Нужен rollback... но как?
            raise HTTPException(500, "Payment failed")
 
        # 7. Отправляем email
        await client.post(
            "http://notification-service/send",
            json={"email": user.json()["email"], "order_id": order_response.json()["id"]}
        )
 
        return {"order_id": order_response.json()["id"]}

Распространенные проблемы такого подхода:

Проблема #1: Latency складывается

Best case: 50 + 80 + 120 + 200 + 100 + 300 + 150 = 1000ms
Worst case (P99): 200 + 300 + 500 + 800 + 400 + 1200 + 600 = 4000ms (4 секунды!)

Проблема #2: Cascading failures

Notification Service упал (DNS timeout 30s)
→ Checkout ждёт 30 секунд
→ Users видят spinning loader
→ Users жмут F5
→ Создаются duplicate заказы
→ Chaos

Проблема #3: Частичный rollback невозможен

Сценарий:
1. Заказ создан ✅
2. Резерв на складе сделан ✅
3. Payment Service упал ❌

Результат: Товар зарезервирован, но не оплачен.
Решение: ??? (manual cleanup в админке)

Типичные метрики такой системы под нагрузкой:

  • P99 latency checkout: 3.8+ секунд
  • Success rate: 92% (8% падают из-за timeouts)
  • Duplicate orders: ~5% (users жмут F5 из-за медленной загрузки)
  • Неудовлетворенные клиенты: растут экспоненциально

Синхронные HTTP-вызовы между микросервисами — это как цепь из слабых звеньев. Одно звено ломается → вся цепь рвётся. Latency складывается, failures каскадируются, rollback превращается в кошмар.


Event-Driven подход: архитектурное решение

Ключевая идея: Разделить checkout на две фазы:

Фаза 1 (синхронная): Критичная бизнес-логика

  • Валидация данных
  • Резерв на складе (must succeed сейчас)
  • Создание заказа в статусе PENDING

Фаза 2 (асинхронная): Всё остальное через события

  • Списание денег
  • Email уведомления
  • Аналитика
  • Обновление рекомендаций

Новая архитектура:

# ✅ Event-driven подход
from fastapi import FastAPI, HTTPException
from kafka import KafkaProducer
import json
 
app = FastAPI()
 
# Kafka producer (можно Redis Streams, RabbitMQ и т.д.)
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
 
@app.post("/checkout")
async def checkout(order: CheckoutRequest):
    # СИНХРОННО (критично для UX)
    # 1. Быстрая валидация
    validate_order_data(order)  # 5ms, в памяти
 
    # 2. Резерв на складе (единственный HTTP-запрос!)
    async with httpx.AsyncClient(timeout=2.0) as client:
        inventory = await client.post(
            "http://inventory-service/reserve",
            json={"items": order.items}
        )
        if inventory.status_code != 200:
            raise HTTPException(400, "Out of stock")
 
    # 3. Создаём заказ в статусе PENDING
    order_id = await db.create_order(order, status="PENDING")
 
    # АСИНХРОННО (через события)
    # Публикуем событие — дальше не наша проблема
    producer.send('orders.created', {
        'order_id': order_id,
        'user_id': order.user_id,
        'items': order.items,
        'total': order.total,
        'timestamp': datetime.utcnow().isoformat()
    })
 
    # Возвращаем результат мгновенно!
    return {
        "order_id": order_id,
        "status": "pending",
        "message": "Order created, payment processing"
    }

Consumers (отдельные сервисы слушают события):

# Payment Service (слушает события "orders.created")
from kafka import KafkaConsumer
import json
 
consumer = KafkaConsumer(
    'orders.created',
    bootstrap_servers='kafka:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='payment-service'
)
 
for message in consumer:
    order = message.value
 
    try:
        # Списываем деньги
        payment = await charge_payment(order['user_id'], order['total'])
 
        # Публикуем событие об успехе
        producer.send('payments.completed', {
            'order_id': order['order_id'],
            'payment_id': payment.id,
            'status': 'success'
        })
 
    except PaymentError as e:
        # Публикуем событие об ошибке
        producer.send('payments.failed', {
            'order_id': order['order_id'],
            'reason': str(e)
        })
# Notification Service (слушает события "payments.completed")
consumer = KafkaConsumer(
    'payments.completed',
    bootstrap_servers='kafka:9092',
    group_id='notification-service'
)
 
for message in consumer:
    payment = message.value
 
    # Отправляем email (даже если упадёт — не повлияет на checkout)
    await send_email(
        user_id=payment['user_id'],
        template='order_confirmation',
        data={'order_id': payment['order_id']}
    )

Типичные результаты после внедрения (по опыту миграций в индустрии):

Request-Response
Event-Driven
P99 latency checkout
3.8s
180ms
95%
Success rate
92%
99.9%
9%
Duplicate orders
5%
0.01%
100%
Зависимость от 7 сервисов
Да
Зависимость от 1 сервиса
(Inventory)

Экономия на инфраструктуре: В типичных случаях количество инстансов снижается на 20-40% (за счёт меньшей нагрузки и лучшего scaling). Это ~$500-1000/месяц для средних проектов.

Главное: Улучшается UX (быстрый checkout), снижается количество инцидентов, команда получает меньше алертов по ночам.


Когда использовать Event-Driven Architecture

Event-Driven — это не серебряная пуля. Есть случаи, когда синхронный подход проще и лучше.

Выбирайте EDA, если ответили "ДА" на 3+ вопроса:

1. Есть асинхронные операции?

Примеры:
- Отправка email/SMS
- Генерация отчётов
- Обработка изображений/видео
- ML-инференс
- Аналитика

2. Нужна отказоустойчивость?

Сценарий: Email-сервис упал
❌ Синхронно: Checkout падает для всех
✅ Event-driven: Checkout работает, emails отправятся позже

3. Микросервисная архитектура?

5+ сервисов → высокий риск cascading failures
Event-driven снижает coupling между сервисами

4. Высокая нагрузка?

> 1000 req/s → синхронные вызовы тормозят
Event bus работает как буфер, сглаживает пики нагрузки

5. Нужна аудит-трейл / Event Sourcing?

Требование: восстановить состояние системы на любой момент времени
Event log = полная история всех изменений

6. Разные скорости обработки?

Fast: API endpoint отвечает за 50ms
Slow: Email генерируется 2 секунды

Event-driven позволяет не ждать slow операций

НЕ используйте EDA, если:

❌ Простая CRUD-система

Todo-app, блог, простая админка
→ Синхронный REST API проще и понятнее

❌ Нужна немедленная консистентность

Банковская транзакция: деньги должны списаться СЕЙЧАС
→ Eventual consistency не подходит

❌ Команда не готова

Debugging event-driven систем сложнее
Нужны:
- Distributed tracing
- Event monitoring
- Понимание eventual consistency

❌ Один монолит, 3 разработчика

Overhead от event bus не оправдан
→ Начните с модульного монолита

Best practice из индустрии: Если операция должна завершиться ДО того, как пользователь получит ответ — используйте синхронный вызов. Всё остальное — кандидат на события. Этот подход используется в Netflix, Uber, Spotify и других high-scale системах.


Основные паттерны Event-Driven систем

Event-Driven — это не просто "бросить событие в Kafka". Есть несколько паттернов с разными trade-offs.

Паттерн #1: Event Notification (простые уведомления)

Идея: Событие содержит минимум данных, только ID и тип события.

# Publisher
producer.send('user.registered', {
    'event_id': 'evt_123',
    'user_id': 'usr_456',
    'timestamp': '2025-12-26T10:00:00Z'
})
 
# Consumer (должен запросить данные сам)
@app.on_event('user.registered')
async def on_user_registered(event):
    # Делаем HTTP-запрос за данными
    user = await user_service.get_user(event['user_id'])
    await send_welcome_email(user.email)

Плюсы:

  • ✅ Маленький размер событий
  • ✅ Нет дублирования данных

Минусы:

  • ❌ Consumer зависит от source service (coupling)
  • ❌ Дополнительный HTTP-запрос (latency)

Когда использовать: Данные часто меняются, и актуальность критична.


Паттерн #2: Event-Carried State Transfer (события с данными)

Идея: Событие содержит ВСЕ необходимые данные.

# Publisher
producer.send('user.registered', {
    'event_id': 'evt_123',
    'user_id': 'usr_456',
    'email': 'user@example.com',
    'name': 'John Doe',
    'created_at': '2025-12-26T10:00:00Z',
    'subscription_plan': 'premium'
})
 
# Consumer (не делает HTTP-запросов!)
@app.on_event('user.registered')
async def on_user_registered(event):
    # Всё уже есть в событии
    await send_welcome_email(
        email=event['email'],
        name=event['name']
    )

Плюсы:

  • ✅ Consumer не зависит от source service
  • ✅ Нет дополнительных HTTP-запросов
  • ✅ Можно обработать событие, даже если source service упал

Минусы:

  • ❌ Большой размер событий (больше трафика)
  • ❌ Дублирование данных (риск inconsistency)

Когда использовать: Consumers должны работать независимо, данные меняются редко.

Рекомендуемый подход для большинства случаев: Event-Carried State Transfer в 80% сценариев. Снижает coupling, упрощает debugging (всё данные видны в событии). Используется в архитектурах Amazon, Zalando, Shopify.


Паттерн #3: Event Sourcing (события как источник истины)

Идея: Состояние системы = сумма всех событий. Не храним текущее состояние в БД, храним только события.

# Традиционный подход (CRUD)
class Order:
    id: int
    status: str  # "pending" | "paid" | "shipped" | "completed"
    total: float
 
# При изменении перезаписываем строку в БД
await db.execute("UPDATE orders SET status = 'paid' WHERE id = 123")
# Event Sourcing подход
# События (append-only log)
events = [
    {'type': 'OrderCreated', 'order_id': 123, 'total': 1500, 'timestamp': '2025-12-26T10:00:00Z'},
    {'type': 'PaymentReceived', 'order_id': 123, 'amount': 1500, 'timestamp': '2025-12-26T10:05:00Z'},
    {'type': 'OrderShipped', 'order_id': 123, 'tracking': 'TR123', 'timestamp': '2025-12-26T12:00:00Z'},
]
 
# Состояние = replay всех событий
def get_order_state(order_id):
    events = event_store.get_events(order_id)
    state = {}
    for event in events:
        if event['type'] == 'OrderCreated':
            state['status'] = 'pending'
            state['total'] = event['total']
        elif event['type'] == 'PaymentReceived':
            state['status'] = 'paid'
        elif event['type'] == 'OrderShipped':
            state['status'] = 'shipped'
    return state

Плюсы:

  • ✅ Полный audit trail (можно восстановить состояние на любой момент времени)
  • ✅ Debugging проще (видны все изменения)
  • ✅ Time travel (откат к прошлому состоянию)

Минусы:

  • ❌ Сложность (нужен event store, snapshots, projections)
  • ❌ Производительность (replay событий может быть медленным)
  • ❌ Нельзя удалить данные (GDPR проблема)

Когда использовать: Финансовые системы, аудит критичен, сложная бизнес-логика.


Паттерн #4: CQRS (Command Query Responsibility Segregation)

Идея: Разделить запись (Command) и чтение (Query) на разные модели.

┌─────────────┐
│   Client    │
└──────┬──────┘
       │
       ├─ Command (write) ─────────► Command Handler ─► Event Store
       │                                                       │
       └─ Query (read) ────────────► Read Model ◄──────────────┘
                                     (materialized view)

Пример:

# Command side (write)
class CreateOrderCommand:
    user_id: int
    items: List[Item]
 
@app.post("/orders")
async def create_order(cmd: CreateOrderCommand):
    # Валидация
    # Создание событий
    events = [
        OrderCreatedEvent(user_id=cmd.user_id, items=cmd.items),
        InventoryReservedEvent(items=cmd.items)
    ]
    # Сохранение в event store
    await event_store.append(events)
    return {"order_id": events[0].order_id}
 
# Query side (read)
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
    # Читаем из materialized view (быстро!)
    return await read_db.get_order(order_id)
 
# Projector (обновляет read model из событий)
@consumer.on('OrderCreatedEvent')
async def project_order(event):
    await read_db.insert_order({
        'id': event.order_id,
        'user_id': event.user_id,
        'status': 'pending',
        'created_at': event.timestamp
    })

Плюсы:

  • ✅ Read и write оптимизированы независимо
  • ✅ Сложные запросы не влияют на write performance
  • ✅ Можно масштабировать read и write отдельно

Минусы:

  • ❌ Сложность (две модели данных)
  • ❌ Eventual consistency (read model может отставать)

Когда использовать: Read-heavy системы, сложные аналитические запросы.

Apache KafkaRabbitMQRedis StreamsNATSAWS EventBridge

Технологии: Kafka vs RabbitMQ vs Redis Streams

Выбор event bus — критичное решение. Давайте сравним топ-3.

Apache Kafka: танк для высоких нагрузок

Когда использовать:

  • Throughput > 100k messages/sec
  • Нужна персистентность (replay событий)
  • Event sourcing / stream processing
  • Большая команда (есть кто поддерживать)

Плюсы:

  • ✅ Высокая throughput (миллионы msg/sec)
  • ✅ Persistence (события хранятся дни/месяцы)
  • ✅ Replay (можно перечитать old события)
  • ✅ Partitioning (horizontal scaling)

Минусы:

  • ❌ Сложность настройки (Zookeeper, brokers, replication)
  • ❌ Overkill для малых систем
  • ❌ Latency выше чем у RabbitMQ (optimized for throughput)

Что такое ZooKeeper? Это распределённая система координации, которая использовалась в Kafka для хранения метаданных (какие brokers живы, какие топики существуют, кто leader партиции). Представьте ZooKeeper как "адресную книгу" кластера Kafka — он отвечает на вопросы типа "где находится лидер партиции X?" или "какие consumers в группе Y?".

Хорошая новость: С версии Kafka 3.3+ (2022) появился KRaft mode (Kafka Raft), который не требует ZooKeeper. Kafka сам управляет своими метаданными. Это упрощает деплой и уменьшает количество движущихся частей. В production 2025 года рекомендуется использовать KRaft вместо ZooKeeper.

Пример (Python + aiokafka):

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
 
# Producer
producer = AIOKafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode()
)
await producer.start()
await producer.send('orders.created', {'order_id': 123})
 
# Consumer
consumer = AIOKafkaConsumer(
    'orders.created',
    bootstrap_servers='localhost:9092',
    group_id='payment-service',
    value_deserializer=lambda m: json.loads(m.decode())
)
await consumer.start()
async for msg in consumer:
    print(f"Order: {msg.value}")

Docker Compose (KRaft mode, рекомендуется в 2025):

services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      # KRaft mode (без ZooKeeper)
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      # Форматирование storage (обязательно для KRaft)
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
Legacy вариант с ZooKeeper (если нужна совместимость)
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

RabbitMQ: гибкий и простой

Когда использовать:

  • Нужен гибкий routing (topic exchange, fanout, headers)
  • Priority queues
  • Небольшие/средние нагрузки (< 50k msg/sec)
  • Быстрый старт (проще настроить чем Kafka)

Плюсы:

  • ✅ Простота setup
  • ✅ Гибкий routing (exchange types)
  • ✅ UI для мониторинга (RabbitMQ Management)
  • ✅ Низкая latency

Минусы:

  • ❌ Throughput ниже чем Kafka
  • ❌ Нет персистентности по умолчанию (можно включить, но медленнее)
  • ❌ Replay сложнее

Пример (Python + aio-pika):

import aio_pika
import json
 
# Publisher
connection = await aio_pika.connect_robust("amqp://localhost/")
channel = await connection.channel()
 
await channel.default_exchange.publish(
    aio_pika.Message(
        body=json.dumps({'order_id': 123}).encode(),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT
    ),
    routing_key='orders.created'
)
 
# Consumer
queue = await channel.declare_queue('payment-service-queue', durable=True)
await queue.bind(exchange='events', routing_key='orders.created')
 
async with queue.iterator() as queue_iter:
    async for message in queue_iter:
        async with message.process():
            data = json.loads(message.body.decode())
            print(f"Order: {data}")

Redis Streams: легковес для простых кейсов

Когда использовать:

  • Уже используете Redis
  • Нужна низкая latency
  • Простые use cases (pub/sub, task queue)
  • Малые/средние нагрузки

Плюсы:

  • ✅ Минимальный overhead (Redis уже есть)
  • ✅ Очень низкая latency
  • ✅ Простота (знакомый Redis API)
  • ✅ Consumer groups (как в Kafka)

Минусы:

  • ❌ In-memory (ограничено RAM)
  • ❌ Нет persistence гарантий
  • ❌ Не для больших нагрузок

Пример (Python + redis-py):

import redis.asyncio as redis
import json
 
r = await redis.from_url("redis://localhost")
 
# Producer
await r.xadd(
    'orders:created',
    {'data': json.dumps({'order_id': 123})}
)
 
# Consumer
consumer_group = 'payment-service'
await r.xgroup_create('orders:created', consumer_group, mkstream=True)
 
while True:
    messages = await r.xreadgroup(
        consumer_group,
        'worker-1',
        {'orders:created': '>'},
        count=10
    )
    for stream, msgs in messages:
        for msg_id, data in msgs:
            order = json.loads(data[b'data'])
            print(f"Order: {order}")
            await r.xack('orders:created', consumer_group, msg_id)

Сравнительная таблица

ХарактеристикаKafkaRabbitMQRedis Streams
Throughput1M+ msg/s50k msg/s100k msg/s
Latency (P99)10-50ms5-10ms1-5ms
PersistenceДа (дни/месяцы)ОпциональноIn-memory (риск потери)
ReplayДаСложноДа (limited)
ComplexityВысокаяСредняяНизкая
ScalingHorizontalVertical/ClusteringVertical
Use caseEvent sourcing, high-loadTask queues, routingSimple pub/sub, caching

Рекомендации на основе типичных use-cases:

  • Kafka: E-commerce, финтех, event sourcing (Netflix, LinkedIn, Uber)
  • RabbitMQ: Task queues, background jobs, webhooks (GitHub, Mozilla, Reddit)
  • Redis Streams: Real-time notifications, simple pub/sub (Stack Overflow, Slack)

Подводные камни Event-Driven систем

Event-Driven решает одни проблемы и создаёт другие. Вот грабли, которые регулярно встречаются при внедрении (по опыту публичных post-mortems и технических блогов).

Грабля #1: Eventual Consistency (и как с этим жить)

Проблема: Между публикацией события и его обработкой проходит время.

# User регистрируется
await db.create_user(user_id=123, email='user@example.com')
producer.send('user.registered', {'user_id': 123})
 
# ...10ms спустя...
 
# Другой сервис пытается получить user
user = await user_service.get_user(123)  # Может вернуть None!

Решение #1: Read Your Own Writes

# После создания возвращаем данные сразу
@app.post("/users")
async def create_user(user: UserCreate):
    user_entity = await db.create_user(user)
    producer.send('user.registered', user_entity.dict())
 
    # Возвращаем созданного user
    return user_entity  # Не ждём event processing!

Решение #2: Polling / WebSocket для UI

# Frontend polling
POST /users → { "user_id": 123, "status": "pending" }
 
# Frontend polls
GET /users/123{ "user_id": 123, "status": "pending" }
...
GET /users/123{ "user_id": 123, "status": "active" }  # Готово!

Решение #3: Optimistic UI

// Frontend предполагает успех
onSubmit() {
  // Показываем сразу (оптимистично)
  users.add({ id: 123, email: 'user@example.com', status: 'pending' })
 
  // Отправляем на backend
  await api.createUser(...)
 
  // WebSocket/SSE обновит status когда обработается
}

Eventual consistency — это осознанный архитектурный выбор. Пользователи привыкли к асинхронности (email приходит не мгновенно, банковские переводы обрабатываются в течение дня). Главное — показывать правильный UI (pending states, progress indicators). Этот подход используется во всех крупных платформах: Amazon (заказы), Twitter (лайки), Instagram (notifications).


Грабля #2: Дублирование событий (Idempotency обязательна)

Проблема: Событие может обработаться дважды (network retry, rebalancing).

# ❌ НЕ idempotent
@consumer.on('payment.received')
async def on_payment(event):
    # Если event обработается 2 раза → начислим бонусы дважды!
    await db.execute(
        "UPDATE users SET bonus_balance = bonus_balance + 100 WHERE id = ?",
        event['user_id']
    )

Решение: Idempotency key

# ✅ Idempotent
@consumer.on('payment.received')
async def on_payment(event):
    # Проверяем, обрабатывали ли уже это событие
    processed = await db.fetchval(
        "SELECT 1 FROM processed_events WHERE event_id = ?",
        event['event_id']
    )
    if processed:
        return  # Skip duplicate
 
    # Обрабатываем в транзакции
    async with db.transaction():
        await db.execute(
            "UPDATE users SET bonus_balance = bonus_balance + 100 WHERE id = ?",
            event['user_id']
        )
        await db.execute(
            "INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)",
            event['event_id'], datetime.utcnow()
        )

Альтернатива: Upsert операции

# PostgreSQL UPSERT (идемпотентно по дизайну)
await db.execute("""
    INSERT INTO user_bonuses (user_id, payment_id, amount)
    VALUES (?, ?, 100)
    ON CONFLICT (payment_id) DO NOTHING
""", event['user_id'], event['payment_id'])

Грабля #3: Порядок событий (Ordering не гарантирован)

Проблема: События могут прийти в неправильном порядке.

# Публикуются события
producer.send('order.created', {'order_id': 123, 'status': 'pending'})
producer.send('order.paid', {'order_id': 123, 'status': 'paid'})
 
# Consumer может получить в обратном порядке!
1. order.paid (status = paid)
2. order.created (status = pending)
 
# Результат: Order в неправильном состоянии

Решение #1: Partition key (Kafka)

# Kafka гарантирует порядок внутри partition
# Отправляем все события одного order в одну partition
await producer.send(
    'orders',
    value={'event': 'order.created', 'order_id': 123},
    key=str(123).encode()  # Partition key = order_id
)

Решение #2: Sequence number / version

# Добавляем sequence number в event
events = [
    {'order_id': 123, 'event': 'created', 'seq': 1},
    {'order_id': 123, 'event': 'paid', 'seq': 2},
    {'order_id': 123, 'event': 'shipped', 'seq': 3},
]
 
# Consumer проверяет sequence
@consumer.on('order.*')
async def on_order_event(event):
    current_seq = await db.fetchval("SELECT seq FROM orders WHERE id = ?", event['order_id'])
 
    if event['seq'] <= current_seq:
        return  # Старое событие, игнорируем
 
    # Применяем событие
    await apply_event(event)

Грабля #4: Dead Letter Queue (когда событие не обрабатывается)

Проблема: Consumer падает при обработке события.

@consumer.on('payment.received')
async def on_payment(event):
    # Внешний API упал
    await stripe.refund(event['payment_id'])  # Timeout!

Без DLQ: Событие будет retry бесконечно, блокируя очередь.

Решение: Dead Letter Queue + retry with backoff

from tenacity import retry, stop_after_attempt, wait_exponential
 
@consumer.on('payment.received')
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60)
)
async def on_payment(event):
    try:
        await stripe.refund(event['payment_id'])
    except Exception as e:
        # После 5 попыток → отправляем в DLQ
        if should_send_to_dlq(e):
            await producer.send('dlq.payments', event)
            raise

Kafka DLQ config:

from aiokafka import AIOKafkaConsumer
 
consumer = AIOKafkaConsumer(
    'payments.received',
    enable_auto_commit=False,  # Manual commit
    max_poll_records=10
)
 
async for msg in consumer:
    try:
        await process_payment(msg.value)
        await consumer.commit()  # Success
    except Exception as e:
        # Send to DLQ
        await producer.send('dlq.payments', msg.value)
        await consumer.commit()  # Commit чтобы не retry
        logger.error(f"Failed to process {msg.value}: {e}")

Мониторинг Event-Driven систем

В синхронной системе debugging простой: смотрим stack trace. В event-driven — нужны специальные инструменты.

Метрики для event bus

Обязательные метрики:

from prometheus_client import Counter, Histogram, Gauge
 
# Количество событий
events_published = Counter(
    'events_published_total',
    'Total events published',
    ['topic', 'event_type']
)
 
# Latency обработки
event_processing_duration = Histogram(
    'event_processing_duration_seconds',
    'Event processing duration',
    ['consumer', 'topic']
)
 
# Consumer lag (Kafka specific)
consumer_lag = Gauge(
    'kafka_consumer_lag',
    'Consumer lag in messages',
    ['group', 'topic', 'partition']
)
 
# Dead letter queue size
dlq_size = Gauge(
    'dlq_messages_total',
    'Messages in DLQ',
    ['topic']
)

Использование:

@app.post("/orders")
async def create_order(order: OrderCreate):
    start_time = time.time()
 
    # Создаём заказ
    order_id = await db.create_order(order)
 
    # Публикуем событие
    producer.send('orders.created', {'order_id': order_id})
    events_published.labels(topic='orders.created', event_type='OrderCreated').inc()
 
    # Latency
    event_processing_duration.labels(
        consumer='order-service',
        topic='orders.created'
    ).observe(time.time() - start_time)
 
    return {"order_id": order_id}

Grafana dashboard для EDA

# Throughput (events/sec)
rate(events_published_total[1m])
 
# Consumer lag (warning если > 1000)
kafka_consumer_lag > 1000
 
# P99 latency
histogram_quantile(0.99, rate(event_processing_duration_seconds_bucket[5m]))
 
# DLQ growth rate (alert если растёт)
delta(dlq_messages_total[5m]) > 10

Чеклист: готовы ли вы к Event-Driven?

Инфраструктура:

  • Event bus выбран и настроен (Kafka/RabbitMQ/Redis)
  • Мониторинг и алерты на consumer lag
  • Dead Letter Queue настроен
  • Distributed tracing (OpenTelemetry) внедрён

Архитектура:

  • Определены bounded contexts (какие события публиковать)
  • Спроектирована idempotency стратегия
  • Eventual consistency приемлема для бизнеса
  • Определена стратегия обработки ошибок (retry, DLQ)

Команда:

  • Разработчики понимают eventual consistency
  • Есть опыт debugging distributed систем
  • Готовы к сложности мониторинга

Выводы

Event-Driven Architecture — это мощный инструмент, но не панацея.

Когда использовать:

  • Микросервисная архитектура с 3+ сервисами
  • Асинхронные операции (email, отчёты, ML)
  • Нужна отказоустойчивость
  • Высокая нагрузка (buffering через event bus)

Когда НЕ использовать:

  • Простой CRUD монолит
  • Команда не готова к eventual consistency
  • Нужна немедленная консистентность (финансы)

Ключевые паттерны:

  • Event Notification: минимум данных в событии
  • Event-Carried State Transfer: все данные в событии (рекомендуется для большинства случаев)
  • Event Sourcing: события = source of truth
  • CQRS: разделение read/write моделей

Выбор технологии:

  • Kafka: высокие нагрузки, event sourcing, persistence
  • RabbitMQ: гибкий routing, task queues, средние нагрузки
  • Redis Streams: простота, низкая latency, малые нагрузки

Must-have:

  • Idempotency (защита от дубликатов)
  • DLQ (обработка ошибок)
  • Мониторинг lag и latency
  • Distributed tracing

Главный урок: Начните с простого (Event Notification + RabbitMQ). Усложняйте по мере роста. Не внедряйте Kafka и Event Sourcing "на всякий случай".


Полезные материалы для дальнейшего изучения:

Блоги компаний с EDA:

Книги:

  • "Building Event-Driven Microservices" by Adam Bellemare (O'Reilly, 2020)
  • "Designing Data-Intensive Applications" by Martin Kleppmann

Связанные статьи на сайте:


Нужна помощь с архитектурой? Провожу архитектурные ревью и консультации по внедрению event-driven подхода. Пишите на почту — обсудим ваш кейс.


Подписывайтесь на обновления в Telegram — пишу про архитектуру, Python и современные практики разработки. Без воды, только применимые знания.

Похожие материалы