Skip to main content
Back to course
Архитектура высоконагруженных веб-приложений
9 / 1182%

Event-Driven: контракты, схемы и невозможность обратно откатить

90 минут

Для кого: инженеры, которые устали от «сделайте ещё один HTTP-запрос». Если ваши сервисы всё ещё жёстко связаны, а schema изменений ломает прод каждую неделю — эта глава заставит вас пересмотреть всё.

Провокация №1: вы знаете нагрузку на каждый топик сейчас?

  • Если вы не видите messages_per_sec по каждому топику, consumer_lag_seconds по каждому consumer group и schema_compatibility_errors, значит, вы слепы. Любое изменение схемы может убить прод.
  • Метрики, которые обязаны быть в Grafana: topic_messages_per_sec, consumer_lag_max, schema_registry_incompatible_total.

1. Когда нужен event-driven, а когда — нет

ПризнакEvent-drivenСинхронно
Высокая связность сервисов-
Нужна реакция на события (email, аналитика)-
Жёсткие SLA (трансакции денег в real-time)⚠️ (event + компенс.)
Простые CRUD без side-effects-

Главное: event-driven — не «модное слово». Это выбор, когда асинхронность даёт реальный выигрыш.

Провокация №2: у вас есть схема события?

  • Если событие — просто JSON без схемы (Avro/JSON Schema/Protobuf), вы гарантированно сломаете клиентов при следующем релизе.

2. Контракты и схемы

2.1 Schema Registry / Protobuf

message UserCreated {
  string event_id = 1;
  string user_id = 2;
  string email = 3;
  int64 timestamp = 4;
}
  • Версионирование: v1, v2. Добавляйте поля только как optional, не удаляйте старые без миграции.
  • Schema Registry (Kafka) или Git + CI для JSON Schema.

2.2 Эволюция схем

ИзменениеBackward compatibleForward compatibleРиск
Добавить optional поленизкий
Добавить required полевысокий
Удалить полесредний
Переименовать полекритический
Изменить типкритический

Всегда добавляйте новые поля как optional, оставляйте default. В schema registry включайте режим BACKWARD.

2.3 Contract testing

  • Автоматический CI, запрещающий breaking changes. Пример:
npm run schema:check -- --mode backward

Провокация №3: вы понимаете порядок и ключ?

  • Kafka сохраняет порядок только внутри партиции. Если вы публикуете события пользователя без ключа — вы теряете порядок.

3. Partitioning

  • Ключ = user_id → все события пользователя в одной партиции.
  • Hot key → разрезать по user_id % N.
  • Latency критична? Не держите больше 1k сообщений в партиции (lag).
await producer.send({
  topic: "user-events",
  messages: [{ key: userId, value: JSON.stringify(event) }],
});

Провокация №4: idempotency и маршрутизация

  • Если handler не идемпотентен — at-least-once превратится в «удвоенную доставку».

Idempotent consumer

CREATE TABLE processed_events(
  event_id TEXT PRIMARY KEY,
  processed_at TIMESTAMPTZ DEFAULT now()
);
if (await alreadyProcessed(event.id)) return;
await handle(event);
await markProcessed(event.id);

Провокация №5: вы знаете, что делать с «плохими» событиями?

  • DLQ = сигнал аварии. Если DLQ растёт >10 сообщений/минуту, это инцидент.

DLQ обработка

  • Алерт: dlq_messages > 0 >5 минут.
  • Авто-переотправка запрещена без анализа.

4. Kafka/Rabbit/SQS отличие

ТехнологияСценарийTradeoffs
KafkaВысокий throughput, event sourcingТребует кластер, DevOps
RabbitMQTask queues, low latencyВ памяти + persistence, умеренный throughput
SQS/SNSManaged, надёжностьЛимиты latency, цена

Провокация №6: вы тестировали reprocessing?

  • Event-sourcing и CQRS без reprocessing = сказки. Если вы не можете реплейнуть 1 million событий на staging, вы не готовы к восстановлению.

Event Design Principles

  • События = факты, именуйте в прошедшем времени (OrderCreated).
  • В payload только данные, которые действительно произошли, без предвычислений.
  • Размер → дробите большие события (вместо одного "OrderWithAllItems" раздельно "OrderCreated" и "OrderItemAdded").
  • Версионируйте: event_version и совместимость.

5. Event sourcing & CQRS

  • Event store хранит все факты. Проекции (read-модели) пересчитываются из лога.
  • Snapshots: чтобы не проигрывать миллион событий, берите snapshot каждые N событий.
class UserAggregate {
  async load(userId: string) {
    const snapshot = await snapshotStore.get(userId);
    const events = await eventStore.get(userId, snapshot?.version || 0);
    return events.reduce(applyEvent, snapshot?.state ?? {});
  }
 
  async maybeSnapshot(state: any, version: number) {
    if (version % 100 === 0) {
      await snapshotStore.save(userId, { state, version });
    }
  }
}

6. CDC и Debezium

  • Change Data Capture публикует события из БД автоматом (Debezium, Maxwell).
  • Пример коннектора:
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
  • Плюсы: нет ручного кода, strong consistency.
  • Минусы: события технические (change log), нужна нормализация в бизнес-события.

7. Саги через события (choreography)

  • Каждый сервис реагирует на события и публикует следующее.
// OrderSaga
eventBus.on("OrderCreated", async (event) => {
  await eventBus.publish("PaymentRequested", { orderId: event.orderId });
});
 
eventBus.on("PaymentCompleted", async (event) => {
  await eventBus.publish("InventoryReserveRequested", {
    orderId: event.orderId,
  });
});
 
eventBus.on("InventoryReserveFailed", async (event) => {
  await eventBus.publish("OrderCancelled", { orderId: event.orderId });
});
  • Компенсации = события «Cancel». Логику нужно держать идемпотентной.

Провокация №7: вы измеряете event lag?

  • Lag = latest_offset - consumer_offset. Если lag > SLA, вы опоздали с реакцией.

Lag monitoring

  • Kafka: kafka_exporter -> consumer_lag. Alert > 1000 сообщений или >N секунд.
  • Rabbit: messages_unacked.

Провокация №8: как деградируете при падении bus?

  • Event bus умер? Сервисы стоят? Нужен план: degrade to sync, включить компенсации, уведомить бизнес.

Практика

  1. Schema evolution drill:
    • Добавьте новое optional поле в событие, проверьте backward compatibility тестом.
    • Попробуйте добавить required поле — убедитесь, что CI его запрещает.
  2. Event sourcing simulation:
    • Сохраните 1000 событий агрегата, восстановите состояние без snapshot и с snapshot (каждые 100 событий). Сравните время.
  3. CDC pipeline:
    • Настройте Debezium для таблицы orders. Поймайте событие UPDATE и построите projection в Redis.
  4. Saga choreography:
    • Реализуйте цепочку событий для заказа (order→payment→inventory). Вынудите ошибку в inventory, убедитесь, что compensating event OrderCancelled отработал.
  5. Lag + DLQ rehearsal:
    • Остановите consumer, чтобы lag вырос > 500 сообщений, убедитесь, что alert срабатывает.
    • Отправьте повреждённое событие, проверьте, что DLQ наполняется и запускается runbook.

Безопасность: не логируйте payloadы с PII. Обрабатывайте события в тестах только на staging. Реплей на проде делайте только read-only.

Что дальше

Мониторинг и observability — без них события и очереди бессмысленны. Следующая глава разберёт метрики, логи и tracing.

Event-Driven: контракты, схемы и невозможность обратно откатить — Архитектура высоконагруженных веб-приложений — Potapov.me