"Мы добавили в систему real-time уведомления. Теперь при создании заказа делается 7 синхронных HTTP-запросов, и если хотя бы один сервис тормозит — весь checkout падает. Users жалуются, CEO требует голов."
Типичная ситуация в растущих стартапах (подобные истории регулярно появляются в блогах Stripe, Airbnb, финтех-компаний). Классическая синхронная архитектура с красивым фасадом превращается в карточный домик: один медленный сервис валит всю систему.
Результат после внедрения event-driven подхода: P99 latency checkout падает с 2.3 секунд до 180мс. Availability растет с 99.5% до 99.95%. Команда перестает получать алерты в 3 ночи (по опыту миграций в Netflix, Uber, других компаний).
В этой статье — практический гайд по Event-Driven Architecture без евангелизма и магического мышления. Только типичные production-кейсы, код на Python, метрики и честные грабли, которые регулярно встречаются в индустрии.
Правда об Event-Driven Architecture: это не про Kafka
Начну с неудобной правды: Event-Driven Architecture — это не технология. Это подход к проектированию систем.
Представьте две модели общения:
Request-Response (синхронная модель) — как телефонный звонок:
Вы: "Алло, какая погода?"
Друг: "Подожди... (ищет информацию) ...+15°C"
Вы: (ждёте всё это время с трубкой у уха)
Event-Driven (асинхронная модель) — как новостная лента:
Вы: (публикуете событие "Хочу знать погоду")
Погодный сервис: (видит событие и публикует "+15°C" в ленту)
Вы: (увидите ответ, когда будете готовы)
Главное отличие: в первом случае вы ждёте, во втором — продолжаете жить.
Типичный сценарий: как синхронные вызовы убивают checkout
E-commerce стартап решает "улучшить UX" и добавляет новые фичи при создании заказа:
Что должно произойти при checkout:
- Проверить user в Auth Service (50ms)
- Валидировать товары в Catalog Service (80ms)
- Проверить промокод в Discount Service (120ms)
- Резервировать товар на складе в Inventory Service (200ms)
- Создать заказ в Order Service (100ms)
- Списать деньги в Payment Service (300ms)
- Отправить email через Notification Service (150ms)
Синхронная реализация:
# ❌ Так мы сделали (и поплатились)
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
@app.post("/checkout")
async def checkout(order: CheckoutRequest):
async with httpx.AsyncClient(timeout=5.0) as client:
# 1. Проверяем пользователя
user = await client.get(f"http://auth-service/users/{order.user_id}")
if user.status_code != 200:
raise HTTPException(401, "User not found")
# 2. Валидируем товары
catalog = await client.post(
"http://catalog-service/validate",
json={"items": order.items}
)
if catalog.status_code != 200:
raise HTTPException(400, "Invalid items")
# 3. Проверяем промокод
discount = await client.post(
"http://discount-service/validate",
json={"code": order.promo_code}
)
# 4. Резервируем на складе
inventory = await client.post(
"http://inventory-service/reserve",
json={"items": order.items}
)
if inventory.status_code != 200:
raise HTTPException(400, "Out of stock")
# 5. Создаём заказ
order_response = await client.post(
"http://order-service/orders",
json=order.dict()
)
# 6. Списываем деньги
payment = await client.post(
"http://payment-service/charge",
json={"amount": order.total, "user_id": order.user_id}
)
if payment.status_code != 200:
# О нет! Заказ создан, но оплата не прошла!
# Нужен rollback... но как?
raise HTTPException(500, "Payment failed")
# 7. Отправляем email
await client.post(
"http://notification-service/send",
json={"email": user.json()["email"], "order_id": order_response.json()["id"]}
)
return {"order_id": order_response.json()["id"]}Распространенные проблемы такого подхода:
Проблема #1: Latency складывается
Best case: 50 + 80 + 120 + 200 + 100 + 300 + 150 = 1000ms
Worst case (P99): 200 + 300 + 500 + 800 + 400 + 1200 + 600 = 4000ms (4 секунды!)
Проблема #2: Cascading failures
Notification Service упал (DNS timeout 30s)
→ Checkout ждёт 30 секунд
→ Users видят spinning loader
→ Users жмут F5
→ Создаются duplicate заказы
→ Chaos
Проблема #3: Частичный rollback невозможен
Сценарий:
1. Заказ создан ✅
2. Резерв на складе сделан ✅
3. Payment Service упал ❌
Результат: Товар зарезервирован, но не оплачен.
Решение: ??? (manual cleanup в админке)
Типичные метрики такой системы под нагрузкой:
- P99 latency checkout: 3.8+ секунд
- Success rate: 92% (8% падают из-за timeouts)
- Duplicate orders: ~5% (users жмут F5 из-за медленной загрузки)
- Неудовлетворенные клиенты: растут экспоненциально
Синхронные HTTP-вызовы между микросервисами — это как цепь из слабых звеньев. Одно звено ломается → вся цепь рвётся. Latency складывается, failures каскадируются, rollback превращается в кошмар.
Event-Driven подход: архитектурное решение
Ключевая идея: Разделить checkout на две фазы:
Фаза 1 (синхронная): Критичная бизнес-логика
- Валидация данных
- Резерв на складе (must succeed сейчас)
- Создание заказа в статусе
PENDING
Фаза 2 (асинхронная): Всё остальное через события
- Списание денег
- Email уведомления
- Аналитика
- Обновление рекомендаций
Новая архитектура:
# ✅ Event-driven подход
from fastapi import FastAPI, HTTPException
from kafka import KafkaProducer
import json
app = FastAPI()
# Kafka producer (можно Redis Streams, RabbitMQ и т.д.)
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
@app.post("/checkout")
async def checkout(order: CheckoutRequest):
# СИНХРОННО (критично для UX)
# 1. Быстрая валидация
validate_order_data(order) # 5ms, в памяти
# 2. Резерв на складе (единственный HTTP-запрос!)
async with httpx.AsyncClient(timeout=2.0) as client:
inventory = await client.post(
"http://inventory-service/reserve",
json={"items": order.items}
)
if inventory.status_code != 200:
raise HTTPException(400, "Out of stock")
# 3. Создаём заказ в статусе PENDING
order_id = await db.create_order(order, status="PENDING")
# АСИНХРОННО (через события)
# Публикуем событие — дальше не наша проблема
producer.send('orders.created', {
'order_id': order_id,
'user_id': order.user_id,
'items': order.items,
'total': order.total,
'timestamp': datetime.utcnow().isoformat()
})
# Возвращаем результат мгновенно!
return {
"order_id": order_id,
"status": "pending",
"message": "Order created, payment processing"
}Consumers (отдельные сервисы слушают события):
# Payment Service (слушает события "orders.created")
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders.created',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='payment-service'
)
for message in consumer:
order = message.value
try:
# Списываем деньги
payment = await charge_payment(order['user_id'], order['total'])
# Публикуем событие об успехе
producer.send('payments.completed', {
'order_id': order['order_id'],
'payment_id': payment.id,
'status': 'success'
})
except PaymentError as e:
# Публикуем событие об ошибке
producer.send('payments.failed', {
'order_id': order['order_id'],
'reason': str(e)
})# Notification Service (слушает события "payments.completed")
consumer = KafkaConsumer(
'payments.completed',
bootstrap_servers='kafka:9092',
group_id='notification-service'
)
for message in consumer:
payment = message.value
# Отправляем email (даже если упадёт — не повлияет на checkout)
await send_email(
user_id=payment['user_id'],
template='order_confirmation',
data={'order_id': payment['order_id']}
)Типичные результаты после внедрения (по опыту миграций в индустрии):
Экономия на инфраструктуре: В типичных случаях количество инстансов снижается на 20-40% (за счёт меньшей нагрузки и лучшего scaling). Это ~$500-1000/месяц для средних проектов.
Главное: Улучшается UX (быстрый checkout), снижается количество инцидентов, команда получает меньше алертов по ночам.
Когда использовать Event-Driven Architecture
Event-Driven — это не серебряная пуля. Есть случаи, когда синхронный подход проще и лучше.
Выбирайте EDA, если ответили "ДА" на 3+ вопроса:
1. Есть асинхронные операции?
Примеры:
- Отправка email/SMS
- Генерация отчётов
- Обработка изображений/видео
- ML-инференс
- Аналитика
2. Нужна отказоустойчивость?
Сценарий: Email-сервис упал
❌ Синхронно: Checkout падает для всех
✅ Event-driven: Checkout работает, emails отправятся позже
3. Микросервисная архитектура?
5+ сервисов → высокий риск cascading failures
Event-driven снижает coupling между сервисами
4. Высокая нагрузка?
> 1000 req/s → синхронные вызовы тормозят
Event bus работает как буфер, сглаживает пики нагрузки
5. Нужна аудит-трейл / Event Sourcing?
Требование: восстановить состояние системы на любой момент времени
Event log = полная история всех изменений
6. Разные скорости обработки?
Fast: API endpoint отвечает за 50ms
Slow: Email генерируется 2 секунды
Event-driven позволяет не ждать slow операций
НЕ используйте EDA, если:
❌ Простая CRUD-система
Todo-app, блог, простая админка
→ Синхронный REST API проще и понятнее
❌ Нужна немедленная консистентность
Банковская транзакция: деньги должны списаться СЕЙЧАС
→ Eventual consistency не подходит
❌ Команда не готова
Debugging event-driven систем сложнее
Нужны:
- Distributed tracing
- Event monitoring
- Понимание eventual consistency
❌ Один монолит, 3 разработчика
Overhead от event bus не оправдан
→ Начните с модульного монолита
Best practice из индустрии: Если операция должна завершиться ДО того, как пользователь получит ответ — используйте синхронный вызов. Всё остальное — кандидат на события. Этот подход используется в Netflix, Uber, Spotify и других high-scale системах.
Основные паттерны Event-Driven систем
Event-Driven — это не просто "бросить событие в Kafka". Есть несколько паттернов с разными trade-offs.
Паттерн #1: Event Notification (простые уведомления)
Идея: Событие содержит минимум данных, только ID и тип события.
# Publisher
producer.send('user.registered', {
'event_id': 'evt_123',
'user_id': 'usr_456',
'timestamp': '2025-12-26T10:00:00Z'
})
# Consumer (должен запросить данные сам)
@app.on_event('user.registered')
async def on_user_registered(event):
# Делаем HTTP-запрос за данными
user = await user_service.get_user(event['user_id'])
await send_welcome_email(user.email)Плюсы:
- ✅ Маленький размер событий
- ✅ Нет дублирования данных
Минусы:
- ❌ Consumer зависит от source service (coupling)
- ❌ Дополнительный HTTP-запрос (latency)
Когда использовать: Данные часто меняются, и актуальность критична.
Паттерн #2: Event-Carried State Transfer (события с данными)
Идея: Событие содержит ВСЕ необходимые данные.
# Publisher
producer.send('user.registered', {
'event_id': 'evt_123',
'user_id': 'usr_456',
'email': 'user@example.com',
'name': 'John Doe',
'created_at': '2025-12-26T10:00:00Z',
'subscription_plan': 'premium'
})
# Consumer (не делает HTTP-запросов!)
@app.on_event('user.registered')
async def on_user_registered(event):
# Всё уже есть в событии
await send_welcome_email(
email=event['email'],
name=event['name']
)Плюсы:
- ✅ Consumer не зависит от source service
- ✅ Нет дополнительных HTTP-запросов
- ✅ Можно обработать событие, даже если source service упал
Минусы:
- ❌ Большой размер событий (больше трафика)
- ❌ Дублирование данных (риск inconsistency)
Когда использовать: Consumers должны работать независимо, данные меняются редко.
Рекомендуемый подход для большинства случаев: Event-Carried State Transfer в 80% сценариев. Снижает coupling, упрощает debugging (всё данные видны в событии). Используется в архитектурах Amazon, Zalando, Shopify.
Паттерн #3: Event Sourcing (события как источник истины)
Идея: Состояние системы = сумма всех событий. Не храним текущее состояние в БД, храним только события.
# Традиционный подход (CRUD)
class Order:
id: int
status: str # "pending" | "paid" | "shipped" | "completed"
total: float
# При изменении перезаписываем строку в БД
await db.execute("UPDATE orders SET status = 'paid' WHERE id = 123")# Event Sourcing подход
# События (append-only log)
events = [
{'type': 'OrderCreated', 'order_id': 123, 'total': 1500, 'timestamp': '2025-12-26T10:00:00Z'},
{'type': 'PaymentReceived', 'order_id': 123, 'amount': 1500, 'timestamp': '2025-12-26T10:05:00Z'},
{'type': 'OrderShipped', 'order_id': 123, 'tracking': 'TR123', 'timestamp': '2025-12-26T12:00:00Z'},
]
# Состояние = replay всех событий
def get_order_state(order_id):
events = event_store.get_events(order_id)
state = {}
for event in events:
if event['type'] == 'OrderCreated':
state['status'] = 'pending'
state['total'] = event['total']
elif event['type'] == 'PaymentReceived':
state['status'] = 'paid'
elif event['type'] == 'OrderShipped':
state['status'] = 'shipped'
return stateПлюсы:
- ✅ Полный audit trail (можно восстановить состояние на любой момент времени)
- ✅ Debugging проще (видны все изменения)
- ✅ Time travel (откат к прошлому состоянию)
Минусы:
- ❌ Сложность (нужен event store, snapshots, projections)
- ❌ Производительность (replay событий может быть медленным)
- ❌ Нельзя удалить данные (GDPR проблема)
Когда использовать: Финансовые системы, аудит критичен, сложная бизнес-логика.
Паттерн #4: CQRS (Command Query Responsibility Segregation)
Идея: Разделить запись (Command) и чтение (Query) на разные модели.
┌─────────────┐
│ Client │
└──────┬──────┘
│
├─ Command (write) ─────────► Command Handler ─► Event Store
│ │
└─ Query (read) ────────────► Read Model ◄──────────────┘
(materialized view)
Пример:
# Command side (write)
class CreateOrderCommand:
user_id: int
items: List[Item]
@app.post("/orders")
async def create_order(cmd: CreateOrderCommand):
# Валидация
# Создание событий
events = [
OrderCreatedEvent(user_id=cmd.user_id, items=cmd.items),
InventoryReservedEvent(items=cmd.items)
]
# Сохранение в event store
await event_store.append(events)
return {"order_id": events[0].order_id}
# Query side (read)
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
# Читаем из materialized view (быстро!)
return await read_db.get_order(order_id)
# Projector (обновляет read model из событий)
@consumer.on('OrderCreatedEvent')
async def project_order(event):
await read_db.insert_order({
'id': event.order_id,
'user_id': event.user_id,
'status': 'pending',
'created_at': event.timestamp
})Плюсы:
- ✅ Read и write оптимизированы независимо
- ✅ Сложные запросы не влияют на write performance
- ✅ Можно масштабировать read и write отдельно
Минусы:
- ❌ Сложность (две модели данных)
- ❌ Eventual consistency (read model может отставать)
Когда использовать: Read-heavy системы, сложные аналитические запросы.
Технологии: Kafka vs RabbitMQ vs Redis Streams
Выбор event bus — критичное решение. Давайте сравним топ-3.
Apache Kafka: танк для высоких нагрузок
Когда использовать:
- Throughput > 100k messages/sec
- Нужна персистентность (replay событий)
- Event sourcing / stream processing
- Большая команда (есть кто поддерживать)
Плюсы:
- ✅ Высокая throughput (миллионы msg/sec)
- ✅ Persistence (события хранятся дни/месяцы)
- ✅ Replay (можно перечитать old события)
- ✅ Partitioning (horizontal scaling)
Минусы:
- ❌ Сложность настройки (Zookeeper, brokers, replication)
- ❌ Overkill для малых систем
- ❌ Latency выше чем у RabbitMQ (optimized for throughput)
Что такое ZooKeeper? Это распределённая система координации, которая использовалась в Kafka для хранения метаданных (какие brokers живы, какие топики существуют, кто leader партиции). Представьте ZooKeeper как "адресную книгу" кластера Kafka — он отвечает на вопросы типа "где находится лидер партиции X?" или "какие consumers в группе Y?".
Хорошая новость: С версии Kafka 3.3+ (2022) появился KRaft mode (Kafka Raft), который не требует ZooKeeper. Kafka сам управляет своими метаданными. Это упрощает деплой и уменьшает количество движущихся частей. В production 2025 года рекомендуется использовать KRaft вместо ZooKeeper.
Пример (Python + aiokafka):
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
# Producer
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
await producer.start()
await producer.send('orders.created', {'order_id': 123})
# Consumer
consumer = AIOKafkaConsumer(
'orders.created',
bootstrap_servers='localhost:9092',
group_id='payment-service',
value_deserializer=lambda m: json.loads(m.decode())
)
await consumer.start()
async for msg in consumer:
print(f"Order: {msg.value}")Docker Compose (KRaft mode, рекомендуется в 2025):
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
# KRaft mode (без ZooKeeper)
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# Форматирование storage (обязательно для KRaft)
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"Legacy вариант с ZooKeeper (если нужна совместимость)
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181RabbitMQ: гибкий и простой
Когда использовать:
- Нужен гибкий routing (topic exchange, fanout, headers)
- Priority queues
- Небольшие/средние нагрузки (< 50k msg/sec)
- Быстрый старт (проще настроить чем Kafka)
Плюсы:
- ✅ Простота setup
- ✅ Гибкий routing (exchange types)
- ✅ UI для мониторинга (RabbitMQ Management)
- ✅ Низкая latency
Минусы:
- ❌ Throughput ниже чем Kafka
- ❌ Нет персистентности по умолчанию (можно включить, но медленнее)
- ❌ Replay сложнее
Пример (Python + aio-pika):
import aio_pika
import json
# Publisher
connection = await aio_pika.connect_robust("amqp://localhost/")
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps({'order_id': 123}).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
),
routing_key='orders.created'
)
# Consumer
queue = await channel.declare_queue('payment-service-queue', durable=True)
await queue.bind(exchange='events', routing_key='orders.created')
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body.decode())
print(f"Order: {data}")Redis Streams: легковес для простых кейсов
Когда использовать:
- Уже используете Redis
- Нужна низкая latency
- Простые use cases (pub/sub, task queue)
- Малые/средние нагрузки
Плюсы:
- ✅ Минимальный overhead (Redis уже есть)
- ✅ Очень низкая latency
- ✅ Простота (знакомый Redis API)
- ✅ Consumer groups (как в Kafka)
Минусы:
- ❌ In-memory (ограничено RAM)
- ❌ Нет persistence гарантий
- ❌ Не для больших нагрузок
Пример (Python + redis-py):
import redis.asyncio as redis
import json
r = await redis.from_url("redis://localhost")
# Producer
await r.xadd(
'orders:created',
{'data': json.dumps({'order_id': 123})}
)
# Consumer
consumer_group = 'payment-service'
await r.xgroup_create('orders:created', consumer_group, mkstream=True)
while True:
messages = await r.xreadgroup(
consumer_group,
'worker-1',
{'orders:created': '>'},
count=10
)
for stream, msgs in messages:
for msg_id, data in msgs:
order = json.loads(data[b'data'])
print(f"Order: {order}")
await r.xack('orders:created', consumer_group, msg_id)Сравнительная таблица
| Характеристика | Kafka | RabbitMQ | Redis Streams |
|---|---|---|---|
| Throughput | 1M+ msg/s | 50k msg/s | 100k msg/s |
| Latency (P99) | 10-50ms | 5-10ms | 1-5ms |
| Persistence | Да (дни/месяцы) | Опционально | In-memory (риск потери) |
| Replay | Да | Сложно | Да (limited) |
| Complexity | Высокая | Средняя | Низкая |
| Scaling | Horizontal | Vertical/Clustering | Vertical |
| Use case | Event sourcing, high-load | Task queues, routing | Simple pub/sub, caching |
Рекомендации на основе типичных use-cases:
- Kafka: E-commerce, финтех, event sourcing (Netflix, LinkedIn, Uber)
- RabbitMQ: Task queues, background jobs, webhooks (GitHub, Mozilla, Reddit)
- Redis Streams: Real-time notifications, simple pub/sub (Stack Overflow, Slack)
Подводные камни Event-Driven систем
Event-Driven решает одни проблемы и создаёт другие. Вот грабли, которые регулярно встречаются при внедрении (по опыту публичных post-mortems и технических блогов).
Грабля #1: Eventual Consistency (и как с этим жить)
Проблема: Между публикацией события и его обработкой проходит время.
# User регистрируется
await db.create_user(user_id=123, email='user@example.com')
producer.send('user.registered', {'user_id': 123})
# ...10ms спустя...
# Другой сервис пытается получить user
user = await user_service.get_user(123) # Может вернуть None!Решение #1: Read Your Own Writes
# После создания возвращаем данные сразу
@app.post("/users")
async def create_user(user: UserCreate):
user_entity = await db.create_user(user)
producer.send('user.registered', user_entity.dict())
# Возвращаем созданного user
return user_entity # Не ждём event processing!Решение #2: Polling / WebSocket для UI
# Frontend polling
POST /users → { "user_id": 123, "status": "pending" }
# Frontend polls
GET /users/123 → { "user_id": 123, "status": "pending" }
...
GET /users/123 → { "user_id": 123, "status": "active" } # Готово!Решение #3: Optimistic UI
// Frontend предполагает успех
onSubmit() {
// Показываем сразу (оптимистично)
users.add({ id: 123, email: 'user@example.com', status: 'pending' })
// Отправляем на backend
await api.createUser(...)
// WebSocket/SSE обновит status когда обработается
}Eventual consistency — это осознанный архитектурный выбор. Пользователи привыкли к асинхронности (email приходит не мгновенно, банковские переводы обрабатываются в течение дня). Главное — показывать правильный UI (pending states, progress indicators). Этот подход используется во всех крупных платформах: Amazon (заказы), Twitter (лайки), Instagram (notifications).
Грабля #2: Дублирование событий (Idempotency обязательна)
Проблема: Событие может обработаться дважды (network retry, rebalancing).
# ❌ НЕ idempotent
@consumer.on('payment.received')
async def on_payment(event):
# Если event обработается 2 раза → начислим бонусы дважды!
await db.execute(
"UPDATE users SET bonus_balance = bonus_balance + 100 WHERE id = ?",
event['user_id']
)Решение: Idempotency key
# ✅ Idempotent
@consumer.on('payment.received')
async def on_payment(event):
# Проверяем, обрабатывали ли уже это событие
processed = await db.fetchval(
"SELECT 1 FROM processed_events WHERE event_id = ?",
event['event_id']
)
if processed:
return # Skip duplicate
# Обрабатываем в транзакции
async with db.transaction():
await db.execute(
"UPDATE users SET bonus_balance = bonus_balance + 100 WHERE id = ?",
event['user_id']
)
await db.execute(
"INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)",
event['event_id'], datetime.utcnow()
)Альтернатива: Upsert операции
# PostgreSQL UPSERT (идемпотентно по дизайну)
await db.execute("""
INSERT INTO user_bonuses (user_id, payment_id, amount)
VALUES (?, ?, 100)
ON CONFLICT (payment_id) DO NOTHING
""", event['user_id'], event['payment_id'])Грабля #3: Порядок событий (Ordering не гарантирован)
Проблема: События могут прийти в неправильном порядке.
# Публикуются события
producer.send('order.created', {'order_id': 123, 'status': 'pending'})
producer.send('order.paid', {'order_id': 123, 'status': 'paid'})
# Consumer может получить в обратном порядке!
1. order.paid (status = paid)
2. order.created (status = pending)
# Результат: Order в неправильном состоянииРешение #1: Partition key (Kafka)
# Kafka гарантирует порядок внутри partition
# Отправляем все события одного order в одну partition
await producer.send(
'orders',
value={'event': 'order.created', 'order_id': 123},
key=str(123).encode() # Partition key = order_id
)Решение #2: Sequence number / version
# Добавляем sequence number в event
events = [
{'order_id': 123, 'event': 'created', 'seq': 1},
{'order_id': 123, 'event': 'paid', 'seq': 2},
{'order_id': 123, 'event': 'shipped', 'seq': 3},
]
# Consumer проверяет sequence
@consumer.on('order.*')
async def on_order_event(event):
current_seq = await db.fetchval("SELECT seq FROM orders WHERE id = ?", event['order_id'])
if event['seq'] <= current_seq:
return # Старое событие, игнорируем
# Применяем событие
await apply_event(event)Грабля #4: Dead Letter Queue (когда событие не обрабатывается)
Проблема: Consumer падает при обработке события.
@consumer.on('payment.received')
async def on_payment(event):
# Внешний API упал
await stripe.refund(event['payment_id']) # Timeout!Без DLQ: Событие будет retry бесконечно, блокируя очередь.
Решение: Dead Letter Queue + retry with backoff
from tenacity import retry, stop_after_attempt, wait_exponential
@consumer.on('payment.received')
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60)
)
async def on_payment(event):
try:
await stripe.refund(event['payment_id'])
except Exception as e:
# После 5 попыток → отправляем в DLQ
if should_send_to_dlq(e):
await producer.send('dlq.payments', event)
raiseKafka DLQ config:
from aiokafka import AIOKafkaConsumer
consumer = AIOKafkaConsumer(
'payments.received',
enable_auto_commit=False, # Manual commit
max_poll_records=10
)
async for msg in consumer:
try:
await process_payment(msg.value)
await consumer.commit() # Success
except Exception as e:
# Send to DLQ
await producer.send('dlq.payments', msg.value)
await consumer.commit() # Commit чтобы не retry
logger.error(f"Failed to process {msg.value}: {e}")Мониторинг Event-Driven систем
В синхронной системе debugging простой: смотрим stack trace. В event-driven — нужны специальные инструменты.
Метрики для event bus
Обязательные метрики:
from prometheus_client import Counter, Histogram, Gauge
# Количество событий
events_published = Counter(
'events_published_total',
'Total events published',
['topic', 'event_type']
)
# Latency обработки
event_processing_duration = Histogram(
'event_processing_duration_seconds',
'Event processing duration',
['consumer', 'topic']
)
# Consumer lag (Kafka specific)
consumer_lag = Gauge(
'kafka_consumer_lag',
'Consumer lag in messages',
['group', 'topic', 'partition']
)
# Dead letter queue size
dlq_size = Gauge(
'dlq_messages_total',
'Messages in DLQ',
['topic']
)Использование:
@app.post("/orders")
async def create_order(order: OrderCreate):
start_time = time.time()
# Создаём заказ
order_id = await db.create_order(order)
# Публикуем событие
producer.send('orders.created', {'order_id': order_id})
events_published.labels(topic='orders.created', event_type='OrderCreated').inc()
# Latency
event_processing_duration.labels(
consumer='order-service',
topic='orders.created'
).observe(time.time() - start_time)
return {"order_id": order_id}Grafana dashboard для EDA
# Throughput (events/sec)
rate(events_published_total[1m])
# Consumer lag (warning если > 1000)
kafka_consumer_lag > 1000
# P99 latency
histogram_quantile(0.99, rate(event_processing_duration_seconds_bucket[5m]))
# DLQ growth rate (alert если растёт)
delta(dlq_messages_total[5m]) > 10Чеклист: готовы ли вы к Event-Driven?
Инфраструктура:
- Event bus выбран и настроен (Kafka/RabbitMQ/Redis)
- Мониторинг и алерты на consumer lag
- Dead Letter Queue настроен
- Distributed tracing (OpenTelemetry) внедрён
Архитектура:
- Определены bounded contexts (какие события публиковать)
- Спроектирована idempotency стратегия
- Eventual consistency приемлема для бизнеса
- Определена стратегия обработки ошибок (retry, DLQ)
Команда:
- Разработчики понимают eventual consistency
- Есть опыт debugging distributed систем
- Готовы к сложности мониторинга
Выводы
Event-Driven Architecture — это мощный инструмент, но не панацея.
Когда использовать:
- Микросервисная архитектура с 3+ сервисами
- Асинхронные операции (email, отчёты, ML)
- Нужна отказоустойчивость
- Высокая нагрузка (buffering через event bus)
Когда НЕ использовать:
- Простой CRUD монолит
- Команда не готова к eventual consistency
- Нужна немедленная консистентность (финансы)
Ключевые паттерны:
- Event Notification: минимум данных в событии
- Event-Carried State Transfer: все данные в событии (рекомендуется для большинства случаев)
- Event Sourcing: события = source of truth
- CQRS: разделение read/write моделей
Выбор технологии:
- Kafka: высокие нагрузки, event sourcing, persistence
- RabbitMQ: гибкий routing, task queues, средние нагрузки
- Redis Streams: простота, низкая latency, малые нагрузки
Must-have:
- Idempotency (защита от дубликатов)
- DLQ (обработка ошибок)
- Мониторинг lag и latency
- Distributed tracing
Главный урок: Начните с простого (Event Notification + RabbitMQ). Усложняйте по мере роста. Не внедряйте Kafka и Event Sourcing "на всякий случай".
Полезные материалы для дальнейшего изучения:
Блоги компаний с EDA:
- Netflix Tech Blog — архитектура streaming-платформы
- Uber Engineering — event-driven системы в real-time
- Stripe Blog — idempotency и надежность
- AWS Architecture Blog — паттерны событийных систем
Книги:
- "Building Event-Driven Microservices" by Adam Bellemare (O'Reilly, 2020)
- "Designing Data-Intensive Applications" by Martin Kleppmann
Связанные статьи на сайте:
- Монолит → Микросервисы — как разделять систему
- Kafka + FastAPI — практический гайд по Kafka
- Мониторинг стек — observability для distributed систем
Нужна помощь с архитектурой? Провожу архитектурные ревью и консультации по внедрению event-driven подхода. Пишите на почту — обсудим ваш кейс.
Подписывайтесь на обновления в Telegram — пишу про архитектуру, Python и современные практики разработки. Без воды, только применимые знания.

