Перейти к содержимому

Kafka + FastAPI: введение в event-driven архитектуру

Константин Потапов
35 мин

Экспертный разбор Apache Kafka для Python-разработчиков: от концепций до production-ready интеграции с FastAPI. Практические примеры, подводные камни и тестирование.

Kafka + FastAPI: введение в event-driven архитектуру

Когда REST API уже не справляется

Представьте: ваш сервис обрабатывает заказы. При создании заказа нужно отправить email, обновить склад, начислить бонусы, уведомить курьерскую службу. Первая итерация — синхронные вызовы:

@app.post("/orders")
async def create_order(order: OrderCreate):
    db_order = await save_order(order)
 
    await send_email(order.user_email)        # 500ms latency
    await update_inventory(order.items)       # 300ms latency
    await calculate_bonuses(order.user_id)    # 200ms latency
    await notify_delivery(order.id)           # 400ms latency
 
    return db_order  # Пользователь ждёт 1.4 секунды

Проблемы:

  • Пользователь ждёт завершения всех операций — 1.4 секунды вместо 50ms
  • Если email-сервис упал → весь запрос падает
  • Retry логика усложняет код
  • Нагрузка растёт → растёт пропорционально

Второй подход — фоновые задачи через Celery. Лучше, но:

  • Celery + Redis/RabbitMQ — ещё одна инфраструктура
  • Нет гарантий доставки при перезапуске
  • Сложно отследить цепочку событий
  • Масштабирование воркеров требует ручной настройки

Kafka решает эти проблемы архитектурно:

  • Асинхронность из коробки — API отвечает мгновенно
  • Гарантии доставки через и
  • Горизонтальное масштабирование через партиции
  • Распределённая обработка с consumer groups
  • Event sourcing и событий для отладки

Kafka превращает монолит в event-driven систему, где сервисы общаются через события, а не прямые вызовы. Это не просто очередь — это распределённая база данных событий.

Apache Kafka изначально разработан в LinkedIn (2011) для обработки логов активности пользователей. Сейчас это стандарт индустрии для event streaming: Netflix, Uber, Airbnb используют Kafka для миллионов событий в секунду.

Что такое Kafka простыми словами

Аналогия: YouTube для событий

Kafka — это как YouTube, но для событий в вашей системе:

  • Producer (продюсер) — как канал, который публикует видео (события)
  • Topic (топик) — как плейлист (категория событий: "orders", "notifications")
  • Partition (партиция) — представьте, что плейлист из 1000 видео разделили на 3 папки по ~333 видео. Теперь 3 человека могут независимо смотреть свою папку параллельно — не мешая друг другу
  • Consumer (консьюмер) — как подписчик, который смотрит видео (читает события)
  • Consumer Group (группа консьюмеров) — команда зрителей, где каждый берёт одну папку (партицию): первый смотрит папку 1, второй — папку 2, третий — папку 3. Работа делится автоматически

Ключевое отличие от классических очередей (RabbitMQ, SQS):

RabbitMQ/SQS (очередь по умолчанию)Kafka
Сообщение прочитано → удаленоСобытия хранятся навсегда (или по TTL)
Сообщение уходит к одному консьюмеруМного консьюмеров читают одни и те же события
Нет истории после прочтенияМожно "перемотать назад" (replay)
Брокер распределяет сообщенияКонсьюмер сам выбирает, с какого места читать

Ключевые концепции

Topic (топик) — логическая категория событий. Примеры: orders.created, users.registered, payments.completed.

Partition (партиция) — физическое разделение топика для параллелизма. Топик orders с 3 партициями позволяет 3 консьюмерам читать одновременно.

— уникальный номер события внутри партиции. Как закладка в книге: offset 0 = первое событие, offset 100 = сотое событие. Consumer запоминает свой offset (например, "я прочитал до 150-го события") и может продолжить с этого места после перезапуска, или "перемотать назад" к offset 50.

— приложение, которое отправляет события в Kafka.

— приложение, которое читает события из Kafka.

— группа консьюмеров, которые делят между собой партиции. Kafka гарантирует, что одна партиция читается только одним консьюмером из группы.

— сервер Kafka. Кластер Kafka = несколько брокеров для отказоустойчивости.

— координатор кластера. В Kafka 3.0+ KRaft заменяет ZooKeeper.

Гарантии доставки

Kafka предлагает три уровня гарантий:

1. At most once (максимум один раз) — можем потерять:

# Producer отправил, но не дождался подтверждения
producer.send('topic', value, acks=0)  # Fire and forget

2. At least once (минимум один раз) — можем продублировать:

# Producer ждёт подтверждения, но может повторить при таймауте
producer.send('topic', value, acks=1)  # Leader подтвердил

3. Exactly once (ровно один раз):

# Идемпотентный producer + транзакции
producer = KafkaProducer(
    enable_idempotence=True,
    transactional_id='my-transactional-id'
)

Для большинства бизнес-кейсов достаточно at least once + идемпотентная обработка на стороне консьюмера.

Зачем Kafka нужен Python-разработчику

Типичные сценарии

1. Асинхронная обработка (замена Celery):

Проблема: API должен отвечать быстро, но нужно выполнить медленные операции (email, интеграции).

# ❌ Плохо: пользователь ждёт завершения всех операций
@app.post("/users/register")
async def register(user: UserCreate):
    db_user = await create_user(user)  # 50ms
 
    await send_welcome_email(user.email)      # 500ms — ждём SMTP
    await create_bonus_account(db_user.id)    # 200ms — запрос в другую БД
 
    return db_user  # Пользователь ждал 750ms вместо 50ms

С Kafka API отвечает мгновенно, а фоновый consumer обрабатывает:

# ✅ Хорошо: API отвечает быстро, обработка в фоне
@app.post("/users/register")
async def register(user: UserCreate):
    db_user = await create_user(user)  # 50ms
 
    # Отправили событие в Kafka — занимает ~5ms
    await producer.send('users.registered', {
        'user_id': db_user.id,
        'email': user.email,
        'timestamp': datetime.utcnow().isoformat()
    })
 
    return db_user  # Ответ за 55ms вместо 750ms
 
# Отдельный consumer (можно в том же приложении или отдельном)
async def handle_user_registered(event):
    # Обрабатываем в фоне, не блокируя API
    await send_welcome_email(event['email'])
    await create_bonus_account(event['user_id'])
    # Если упадёт — повторим автоматически (at-least-once)

Преимущества над Celery:

  • Kafka хранит события на диске → не потеряем при перезапуске
  • Replay событий для отладки или пересоздания состояния
  • Несколько consumer groups могут читать одни события
  • Проще масштабирование (добавили партиции → добавили consumers)

2. Микросервисная коммуникация (отказ от прямых HTTP вызовов):

Раньше сервис Order напрямую вызывал Inventory и Notification через HTTP:

# ❌ Плохо: Order Service жёстко связан с другими сервисами
@app.post("/orders")
async def create_order(order: OrderCreate):
    db_order = await save_order(order)
 
    # Прямой HTTP вызов → если Inventory упал, заказ не создаётся
    await http_client.post("http://inventory-service/reserve", {...})
    await http_client.post("http://notification-service/send", {...})
 
    return db_order

С Kafka сервисы не знают друг о друге — они реагируют на события:

# ✅ Хорошо: Order Service публикует событие и забывает
@app.post("/orders")
async def create_order(order: OrderCreate):
    db_order = await save_order(order)
 
    # Отправили событие → ответили пользователю мгновенно
    await producer.send('orders.created', {
        'order_id': db_order.id,
        'user_id': order.user_id,
        'items': order.items,
        'total': order.total
    })
 
    return db_order  # Не ждём Inventory и Notification
 
# Inventory Service — отдельное приложение, которое слушает события
async def handle_order_created(event):
    await reserve_items(event['items'])
    # Если упадёт — не повлияет на создание заказа
 
# Notification Service — ещё одно отдельное приложение
async def handle_order_created(event):
    await send_order_confirmation(event['order_id'])
    # Можно добавить новый сервис, не меняя Order Service

Преимущества:

  • Order Service не ломается, если Inventory недоступен
  • Можно добавить Analytics Service без изменения Order Service
  • Каждый сервис масштабируется независимо
  • События хранятся → можно восстановить состояние при сбое

3. :

Проблема: в обычной БД видим только текущее состояние, теряем историю изменений.

# ❌ Традиционный подход: UPDATE затирает историю
@app.post("/transfer")
async def transfer_money(from_account: str, to_account: str, amount: float):
    # Было: 5000 руб
    await db.execute(
        "UPDATE accounts SET balance = balance - ? WHERE id = ?",
        (amount, from_account)
    )
    # Стало: 4000 руб
    # Потеряли информацию: кто, когда, зачем списал 1000 руб

Event Sourcing: вместо изменения состояния пишем факты (события):

# ✅ Event Sourcing: храним все события
@app.post("/transfer")
async def transfer_money(from_account: str, to_account: str, amount: float):
    transfer_id = generate_uuid()
 
    # Событие 1: Списание
    await producer.send('accounts.debited', {
        'transfer_id': transfer_id,
        'account_id': from_account,
        'amount': -1000,
        'timestamp': datetime.utcnow().isoformat(),
        'description': f'Перевод на {to_account}'
    })
 
    # Событие 2: Зачисление
    await producer.send('accounts.credited', {
        'transfer_id': transfer_id,
        'account_id': to_account,
        'amount': 1000,
        'timestamp': datetime.utcnow().isoformat(),
        'description': f'Перевод от {from_account}'
    })
 
# Consumer восстанавливает текущее состояние из событий
async def rebuild_balance(account_id: str) -> float:
    events = await get_account_events(account_id)  # Читаем из Kafka
    balance = 0
    for event in events:
        if event['type'] == 'debited':
            balance -= event['amount']
        elif event['type'] == 'credited':
            balance += event['amount']
    return balance

Преимущества:

  • Полная история: можем ответить "почему баланс такой?"
  • Аудит из коробки: кто, когда, сколько
  • Replay: пересоздать состояние на любую дату
  • Отладка: воспроизвести ошибку из событий
  • Compliance: для финтеха и банков это требование

4. Real-time аналитика:

Проблема: нужно считать метрики в реальном времени (популярные страницы, активные пользователи), но запросы к БД медленные.

# ❌ Традиционный подход: пишем каждый клик в БД
@app.get("/products/{item_id}")
async def get_product(item_id: int, user_id: int):
    # Записываем клик в PostgreSQL
    await db.execute(
        "INSERT INTO page_views (user_id, page, timestamp) VALUES (?, ?, ?)",
        (user_id, f'/products/{item_id}', datetime.utcnow())
    )
    # При 1000 RPS это 1000 INSERT/s в БД — узкое горло
 
    # Получаем популярные товары — медленный запрос
    popular = await db.execute(
        "SELECT page, COUNT(*) FROM page_views WHERE timestamp > now() - interval '1 hour' GROUP BY page"
    )
    # На больших объёмах это секунды ожидания

С Kafka можно агрегировать метрики в памяти consumer:

# ✅ Хорошо: пишем события в Kafka, агрегируем в памяти
@app.get("/products/{item_id}")
async def get_product(item_id: int, user_id: int):
    # Просто отправили событие — занимает 1-5ms
    await producer.send('user.clicks', {
        'user_id': user_id,
        'page': f'/products/{item_id}',
        'timestamp': time.time(),
        'session_id': request.cookies.get('session_id')
    })
 
    # Читаем популярные товары из Redis (обновляется consumer)
    popular = await redis.get('popular_products_1h')  # < 1ms
    return popular
 
# Consumer агрегирует клики в реальном времени
page_counters = defaultdict(int)  # В памяти
 
async def process_clicks(event):
    page = event['page']
 
    # Инкрементируем счётчик в памяти
    page_counters[page] += 1
 
    # Раз в 10 секунд сбрасываем в Redis
    if should_flush():
        top_pages = sorted(page_counters.items(), key=lambda x: x[1], reverse=True)[:10]
        await redis.set('popular_products_1h', json.dumps(top_pages))
 
    # Трекаем путь пользователя для рекомендаций
    await track_user_journey(event['user_id'], page)

Преимущества:

  • Разгрузили БД: вместо 1000 INSERT/s → запись в Kafka
  • Real-time: счётчики обновляются каждые 10 секунд, а не раз в час
  • Масштабирование: добавили партиций → добавили consumers
  • Несколько consumer groups: один для аналитики, другой для рекомендаций

Когда Kafka избыточен

НЕ используйте Kafka, если:

  • У вас монолит без планов на микросервисы
  • Нагрузка < 100 событий в секунду (хватит Redis/RabbitMQ)
  • Нужны транзакции между сервисами (используйте Saga pattern)
  • Команда не готова поддерживать ещё одну инфраструктуру
  • Критична строгая очерёдность событий (Kafka гарантирует порядок только внутри партиции)

Используйте Kafka, если:

  • Нужна горизонтальная масштабируемость
  • Критична отказоустойчивость и durability
  • Есть множество консьюмеров одних событий
  • Нужен replay (пересмотр истории событий)
  • Планируете event sourcing или

Установка и запуск Kafka

Docker Compose (рекомендуемый способ)

Создайте docker-compose.yml:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
 
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
 
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

Запуск:

docker compose up -d
 
# Проверка статуса
docker compose ps
 
# Логи
docker compose logs -f kafka
 
# UI для мониторинга: http://localhost:8080

В production используйте минимум 3 брокера для отказоустойчивости. Docker Compose подходит только для dev/testing.

Создание топика

Топик — это логическая категория событий в Kafka. Перед началом работы нужно создать топики, в которые будем писать события.

Создадим топик orders с 3 партициями:

# Через docker exec
docker compose exec kafka kafka-topics \
  --create \
  --topic orders \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

Что означают параметры:

  • --topic orders — имя топика (категория событий)
  • --partitions 3 — количество партиций для параллельной обработки. 3 партиции = до 3 консьюмеров могут читать одновременно
  • --replication-factor 1 — количество копий данных (1 = без резервирования). В production используйте минимум 2-3

Проверим, что топик создан:

# Просмотр всех топиков
docker compose exec kafka kafka-topics \
  --list \
  --bootstrap-server localhost:9092
 
# Детали топика: партиции, реплики, конфигурация
docker compose exec kafka kafka-topics \
  --describe \
  --topic orders \
  --bootstrap-server localhost:9092
 
# Вывод:
# Topic: orders  PartitionCount: 3  ReplicationFactor: 1
#   Partition: 0  Leader: 1  Replicas: 1  Isr: 1
#   Partition: 1  Leader: 1  Replicas: 1  Isr: 1
#   Partition: 2  Leader: 1  Replicas: 1  Isr: 1

Топики можно создавать автоматически при первой отправке события, если включена опция auto.create.topics.enable=true. Но в production лучше создавать топики явно с нужными параметрами.

Python клиенты для Kafka

Сравнение библиотек

БиблиотекаAsyncTypedProduction ReadyКогда использовать
aiokafka⚠️FastAPI, async Python
confluent-kafka-pythonSync код, максимальная производительность
kafka-python⚠️Legacy проекты (не активно поддерживается)
faustStream processing (альтернатива Kafka Streams)

Для FastAPI выбираем aiokafka — нативная поддержка async/await и интеграция с asyncio event loop.

Установка aiokafka

pip install aiokafka
 
# Для сериализации JSON
pip install aiokafka orjson
 
# Для Avro схем (optional, для production)
pip install aiokafka fastavro

Что такое ?

В примерах статьи мы используем JSON для простоты:

await producer.send('orders', {'order_id': 123, 'total': 5000})

Проблемы JSON в production:

  • ❌ Много лишних байт: {"order_id":123} — имена полей в каждом событии
  • ❌ Нет контроля схемы: можно отправить {"order_id": "abc"} (строка вместо числа)
  • ❌ Сложная эволюция: добавили поле → старые consumer ломаются

Avro решает эти проблемы:

  • ✅ Компактный бинарный формат: {"order_id":123} (JSON 17 байт) → 5 байт (Avro)
  • ✅ Строгая типизация: schema валидирует данные перед отправкой
  • ✅ Эволюция схемы: можно добавлять поля с default значениями

Как работает Avro:

  1. Определяем схему (один раз):
{
  "type": "record",
  "name": "Order",
  "fields": [
    { "name": "order_id", "type": "int" },
    { "name": "total", "type": "float" },
    { "name": "status", "type": "string", "default": "pending" }
  ]
}
  1. Отправляем события (схема не передаётся, только ID + данные):
from fastavro import schemaless_writer
 
# Вместо 50 байт JSON → 10 байт Avro
await producer.send('orders', avro_serialize(order_data, schema))
  1. Schema Registry хранит версии схем:
    • v1: order_id, total
    • v2: order_id, total, status (новое поле с default)
    • Старые consumers читают v1 и v2 без проблем

Когда использовать Avro:

  • Production системы с большим объёмом данных (экономия трафика)
  • Множество команд работают с одним топиком (контракт схемы)
  • Долгоживущие системы (эволюция без breaking changes)

Для обучения и MVP достаточно JSON. Переход на Avro — это оптимизация на этапе масштабирования.

Интеграция Kafka с FastAPI

Архитектура приложения

kafka-fastapi/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app
│   ├── config.py            # Настройки Kafka
│   ├── kafka/
│   │   ├── __init__.py
│   │   ├── producer.py      # Kafka Producer
│   │   ├── consumer.py      # Kafka Consumer
│   │   └── topics.py        # Топики и схемы
│   ├── models.py            # Pydantic схемы
│   ├── routers/
│   │   └── orders.py        # API endpoints
│   └── consumers/
│       └── order_handler.py # Обработчики событий
├── tests/
│   ├── conftest.py
│   └── test_kafka.py
├── docker-compose.yml
├── requirements.txt
└── README.md

Настройки (app/config.py)

from pydantic_settings import BaseSettings, SettingsConfigDict
 
class Settings(BaseSettings):
    # Kafka
    kafka_bootstrap_servers: list[str] = ["localhost:9092"]
    kafka_consumer_group: str = "fastapi-app"
    kafka_auto_offset_reset: str = "earliest"  # "earliest" или "latest"
 
    # Topics
    orders_topic: str = "orders"
    notifications_topic: str = "notifications"
 
    # App
    app_name: str = "Kafka FastAPI"
    debug: bool = False
 
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )
 
settings = Settings()

Что такое auto_offset_reset и когда что использовать?

Параметр auto_offset_reset определяет, с какого места читать топик, если для consumer group нет сохранённого offset (первый запуск или offset устарел).

earliest — читать с самого начала топика:

kafka_auto_offset_reset: str = "earliest"

Когда использовать:

  • ✅ Первый запуск consumer — нужно обработать все исторические события
  • ✅ Event Sourcing — восстанавливаем состояние из всех событий
  • ✅ Аналитика — считаем метрики за всё время
  • ✅ Dev/Testing — хотим видеть все тестовые события

Плюсы:

  • Не потеряем ни одного события
  • Можно восстановить состояние с нуля

Минусы:

  • При первом запуске обработает ВСЕ события в топике (может быть миллионы)
  • Долгий старт, если топик большой

latest — читать только новые события:

kafka_auto_offset_reset: str = "latest"

Когда использовать:

  • ✅ Production — не нужна история, только текущие события
  • ✅ Уведомления — отправляем только новые, старые неактуальны
  • ✅ Real-time мониторинг — интересны только свежие метрики
  • ✅ После долгого downtime — не хотим обрабатывать накопившиеся события

Плюсы:

  • Быстрый старт — читаем только с момента подключения
  • Не перегружаем систему при первом запуске

Минусы:

  • Потеряем все события, которые были до запуска consumer
  • Не подходит для Event Sourcing

Практические примеры:

# Сервис уведомлений — только новые события
kafka_auto_offset_reset: str = "latest"
# Запустили → получаем только свежие заказы → отправляем email
 
# Сервис аналитики — вся история
kafka_auto_offset_reset: str = "earliest"
# Запустили → обработали все клики за месяц → построили отчёт
 
# Сервис обработки заказов — зависит от ситуации
kafka_auto_offset_reset: str = "earliest"  # Dev: обработаем тестовые заказы
kafka_auto_offset_reset: str = "latest"    # Prod: только новые после деплоя

Важно: После первого запуска и commit offset параметр auto_offset_reset больше не влияет — consumer продолжает с сохранённого offset. Параметр работает только когда offset отсутствует.

Проверить текущий offset:

docker compose exec kafka kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group fastapi-app

Kafka Producer (app/kafka/producer.py)

import json
import logging
from typing import Any, Optional
 
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
 
from app.config import settings
 
logger = logging.getLogger(__name__)
 
class KafkaProducerClient:
    """
    Singleton Kafka Producer для отправки событий.
 
    Использование:
        producer = await get_kafka_producer()
        await producer.send('orders', {'order_id': 123})
    """
 
    def __init__(self):
        self.producer: Optional[AIOKafkaProducer] = None
 
    async def start(self):
        """Создаёт и запускает producer"""
        self.producer = AIOKafkaProducer(
            bootstrap_servers=settings.kafka_bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            # Гарантии доставки
            acks='all',  # Ждём подтверждения от всех реплик
            retries=3,   # Повторные попытки
            # Производительность
            compression_type='gzip',
            max_batch_size=16384,
            linger_ms=10,  # Ждём 10ms для батчинга
        )
        await self.producer.start()
        logger.info("Kafka Producer started")
 
    async def stop(self):
        """Останавливает producer"""
        if self.producer:
            await self.producer.stop()
            logger.info("Kafka Producer stopped")
 
    async def send(
        self,
        topic: str,
        value: dict[str, Any],
        key: Optional[str] = None,
    ) -> None:
        """
        Отправляет событие в Kafka.
 
        Args:
            topic: Имя топика
            value: Payload события (будет сериализован в JSON)
            key: Ключ для партиционирования (optional)
        """
        if not self.producer:
            raise RuntimeError("Producer not started")
 
        try:
            # Ключ определяет партицию (события с одним ключом → одна партиция)
            key_bytes = key.encode('utf-8') if key else None
 
            await self.producer.send_and_wait(
                topic=topic,
                value=value,
                key=key_bytes,
            )
 
            logger.debug(f"Sent to {topic}: {value}")
 
        except KafkaError as e:
            logger.error(f"Failed to send to {topic}: {e}")
            raise
 
# Singleton instance
_producer_client: Optional[KafkaProducerClient] = None
 
async def get_kafka_producer() -> KafkaProducerClient:
    """Dependency injection для FastAPI"""
    global _producer_client
 
    if _producer_client is None:
        _producer_client = KafkaProducerClient()
        await _producer_client.start()
 
    return _producer_client

Потокобезопасность Producer: почему синглетон безопасен в asyncio

Вопрос: Singleton producer + параллельные запросы = проблемы с многопоточностью?

Ответ: В asyncio это безопасно, но есть нюансы:

✅ Безопасно в asyncio (FastAPI по умолчанию):

FastAPI работает в одном event loop, все запросы обрабатываются в одном потоке через корутины:

# Все эти запросы выполняются в одном потоке
async def endpoint1(): await producer.send(...)  # Корутина 1
async def endpoint2(): await producer.send(...)  # Корутина 2
async def endpoint3(): await producer.send(...)  # Корутина 3
# asyncio переключается между корутинами, но всё в одном потоке

Почему это работает:

  • aiokafka.AIOKafkaProducer написан для asyncio
  • Использует await для переключения корутин, а не блокирующие операции
  • Один producer обслуживает тысячи параллельных send() без проблем

❌ НЕ безопасно при реальной многопоточности:

Если запускаете FastAPI с несколькими workers через uvicorn:

# ❌ Плохо: 4 процесса = 4 event loop = race condition на синглетоне
uvicorn app.main:app --workers 4

Каждый worker — это отдельный процесс с собственным синглетоном, поэтому проблем нет. Но если используете threading внутри приложения:

# ❌ ОПАСНО: threading + общий producer
import threading
 
def send_in_thread():
    # aiokafka НЕ thread-safe!
    producer.send(...)  # Race condition
 
thread = threading.Thread(target=send_in_thread)
thread.start()

Решение для threading:

Используйте confluent-kafka-python (синхронный, thread-safe):

from confluent_kafka import Producer
 
# Thread-safe producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
 
def send_in_thread():
    producer.produce('topic', value=b'data')
    producer.flush()  # Дождаться отправки

Или создавайте producer в каждом потоке:

async def send_in_thread():
    # Каждый поток создаёт свой producer
    local_producer = AIOKafkaProducer(...)
    await local_producer.start()
    await local_producer.send(...)
    await local_producer.stop()

Практические рекомендации:

  1. FastAPI + uvicorn (один worker) → синглетон producer безопасен ✅
  2. FastAPI + uvicorn (несколько workers) → каждый worker имеет свой синглетон ✅
  3. FastAPI + threading/multiprocessing внутри → используйте confluent-kafka или локальные producers ⚠️
  4. Celery workers + Kafka → каждый worker создаёт свой producer при старте ✅

Проверка потокобезопасности:

import asyncio
 
async def test_concurrent_sends():
    """Тест параллельной отправки"""
    producer = await get_kafka_producer()
 
    # 1000 параллельных отправок
    tasks = [
        producer.send('test', {'id': i})
        for i in range(1000)
    ]
 
    await asyncio.gather(*tasks)
    print("✅ Все 1000 событий отправлены без ошибок")

Итог: Для FastAPI + aiokafka синглетон producer — правильное решение. Проблемы возникают только при смешивании asyncio с threading, что редко нужно в реальных приложениях.

Kafka Consumer (app/kafka/consumer.py)

import asyncio
import json
import logging
from typing import Callable, Awaitable
 
from aiokafka import AIOKafkaConsumer
from aiokafka.errors import KafkaError
 
from app.config import settings
 
logger = logging.getLogger(__name__)
 
class KafkaConsumerClient:
    """
    Kafka Consumer для чтения событий.
 
    Использование:
        consumer = KafkaConsumerClient('orders', handler_func)
        await consumer.start()
    """
 
    def __init__(
        self,
        topic: str,
        handler: Callable[[dict], Awaitable[None]],
        group_id: Optional[str] = None,
    ):
        self.topic = topic
        self.handler = handler
        self.group_id = group_id or settings.kafka_consumer_group
        self.consumer: Optional[AIOKafkaConsumer] = None
        self._running = False
 
    async def start(self):
        """Создаёт consumer и начинает обработку"""
        self.consumer = AIOKafkaConsumer(
            self.topic,
            bootstrap_servers=settings.kafka_bootstrap_servers,
            group_id=self.group_id,
            auto_offset_reset=settings.kafka_auto_offset_reset,
            enable_auto_commit=False,  # Ручной commit для надёжности
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        )
 
        await self.consumer.start()
        logger.info(f"Consumer started for topic: {self.topic}")
 
        self._running = True
        asyncio.create_task(self._consume_loop())
 
    async def stop(self):
        """Останавливает consumer"""
        self._running = False
        if self.consumer:
            await self.consumer.stop()
            logger.info(f"Consumer stopped for topic: {self.topic}")
 
    async def _consume_loop(self):
        """Основной цикл чтения событий"""
        try:
            async for message in self.consumer:
                try:
                    # Обрабатываем событие
                    await self.handler(message.value)
 
                    # Commit offset после успешной обработки
                    await self.consumer.commit()
 
                    logger.debug(
                        f"Processed message from {self.topic}: "
                        f"partition={message.partition}, offset={message.offset}"
                    )
 
                except Exception as e:
                    logger.error(f"Error processing message: {e}", exc_info=True)
                    # Не делаем commit → сообщение будет перечитано
 
        except KafkaError as e:
            logger.error(f"Kafka error: {e}")
        except Exception as e:
            logger.error(f"Unexpected error: {e}", exc_info=True)

FastAPI Application (app/main.py)

from contextlib import asynccontextmanager
 
from fastapi import FastAPI
 
from app.config import settings
from app.kafka.producer import get_kafka_producer, _producer_client
from app.consumers.order_handler import start_order_consumer, stop_order_consumer
from app.routers import orders
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifecycle: startup and shutdown"""
 
    # Startup: запускаем Kafka Producer
    await get_kafka_producer()
 
    # Startup: запускаем Kafka Consumers
    await start_order_consumer()
 
    yield
 
    # Shutdown: останавливаем Producer и Consumers
    if _producer_client:
        await _producer_client.stop()
 
    await stop_order_consumer()
 
app = FastAPI(
    title=settings.app_name,
    lifespan=lifespan,
)
 
# Подключаем роутеры
app.include_router(orders.router)
 
@app.get("/health")
async def health():
    """Health check"""
    return {"status": "healthy"}

API Endpoints (app/routers/orders.py)

from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
 
from app.kafka.producer import KafkaProducerClient, get_kafka_producer
from app.config import settings
 
router = APIRouter(prefix="/orders", tags=["orders"])
 
class OrderCreate(BaseModel):
    """Схема создания заказа"""
    user_id: int
    items: list[str] = Field(..., min_items=1)
    total_amount: float = Field(..., gt=0)
 
class OrderResponse(BaseModel):
    """Ответ API"""
    order_id: int
    status: str = "pending"
    message: str = "Order is being processed"
 
@router.post("/", response_model=OrderResponse, status_code=status.HTTP_202_ACCEPTED)
async def create_order(
    order: OrderCreate,
    producer: KafkaProducerClient = Depends(get_kafka_producer),
):
    """
    Создаёт заказ и отправляет событие в Kafka.
 
    Возвращает 202 Accepted — обработка асинхронная.
    """
 
    # Генерируем ID (в реальности — из БД)
    order_id = 12345  # Заглушка
 
    # Отправляем событие в Kafka
    await producer.send(
        topic=settings.orders_topic,
        value={
            "event_type": "order.created",
            "order_id": order_id,
            "user_id": order.user_id,
            "items": order.items,
            "total_amount": order.total_amount,
        },
        key=str(order.user_id),  # Партиция по user_id
    )
 
    return OrderResponse(order_id=order_id)

Event Handler (app/consumers/order_handler.py)

import logging
 
from app.kafka.consumer import KafkaConsumerClient
from app.config import settings
 
logger = logging.getLogger(__name__)
 
# Глобальная переменная для consumer
_order_consumer: Optional[KafkaConsumerClient] = None
 
async def handle_order_created(event: dict):
    """
    Обработчик события order.created.
 
    Здесь выполняются все side-effects:
    - Отправка email
    - Обновление склада
    - Начисление бонусов
    """
    order_id = event["order_id"]
    user_id = event["user_id"]
 
    logger.info(f"Processing order {order_id} for user {user_id}")
 
    try:
        # Эмуляция работы
        await send_order_email(user_id, order_id)
        await update_inventory(event["items"])
        await calculate_bonuses(user_id, event["total_amount"])
 
        logger.info(f"Order {order_id} processed successfully")
 
    except Exception as e:
        logger.error(f"Failed to process order {order_id}: {e}")
        raise  # Retry через Kafka
 
async def send_order_email(user_id: int, order_id: int):
    """Заглушка отправки email"""
    logger.info(f"Email sent to user {user_id} for order {order_id}")
 
async def update_inventory(items: list[str]):
    """Заглушка обновления склада"""
    logger.info(f"Inventory updated for items: {items}")
 
async def calculate_bonuses(user_id: int, amount: float):
    """Заглушка начисления бонусов"""
    bonuses = amount * 0.05
    logger.info(f"Bonuses {bonuses} calculated for user {user_id}")
 
async def start_order_consumer():
    """Запускает consumer для обработки заказов"""
    global _order_consumer
 
    _order_consumer = KafkaConsumerClient(
        topic=settings.orders_topic,
        handler=handle_order_created,
    )
 
    await _order_consumer.start()
 
async def stop_order_consumer():
    """Останавливает consumer"""
    if _order_consumer:
        await _order_consumer.stop()

Запуск приложения

# Запускаем Kafka
docker compose up -d
 
# Запускаем FastAPI
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
 
# Тестируем API
curl -X POST http://localhost:8000/orders/ \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": 123,
    "items": ["item1", "item2"],
    "total_amount": 5000
  }'
 
# Смотрим логи consumer
# Должны увидеть: "Processing order 12345 for user 123"

Подводные камни и решения

1. Порядок обработки событий

Проблема: События из разных партиций обрабатываются параллельно.

# User 123: два события в разных партициях
await producer.send('orders', {'user_id': 123, 'action': 'create'})  # → partition 0
await producer.send('orders', {'user_id': 123, 'action': 'cancel'})  # → partition 2
 
# Consumer может обработать 'cancel' раньше 'create'!

Решение: Используйте ключ партиции для группировки связанных событий:

# Все события user_id=123 попадут в одну партицию
await producer.send(
    topic='orders',
    value={'user_id': 123, 'action': 'create'},
    key=str(123)  # Ключ определяет партицию
)

Kafka гарантирует порядок внутри партиции, но не между партициями.

2. Идемпотентность обработки

Проблема: At-least-once delivery → событие может обработаться дважды.

async def handle_payment(event):
    # ❌ Плохо: повторная обработка = двойное списание
    await charge_user(event['user_id'], event['amount'])

Решение: Делайте обработку идемпотентной:

async def handle_payment(event):
    payment_id = event['payment_id']
 
    # Проверяем, обработано ли уже
    if await is_payment_processed(payment_id):
        logger.info(f"Payment {payment_id} already processed, skipping")
        return
 
    # Обрабатываем
    await charge_user(event['user_id'], event['amount'])
 
    # Сохраняем факт обработки
    await mark_payment_processed(payment_id)

3. Dead Letter Queue (DLQ)

Проблема: Событие не обрабатывается (ошибка валидации, bug) → блокирует партицию.

Решение: Отправляйте проблемные события в :

async def _consume_loop(self):
    """Цикл с обработкой ошибок"""
 
    async for message in self.consumer:
        try:
            await self.handler(message.value)
            await self.consumer.commit()
 
        except ValidationError as e:
            # Невалидное событие → в DLQ
            logger.error(f"Validation error: {e}")
            await self._send_to_dlq(message, error=str(e))
            await self.consumer.commit()  # Пропускаем
 
        except Exception as e:
            # Неожиданная ошибка → retry
            logger.error(f"Processing error: {e}")
            # Не делаем commit → Kafka повторит
 
async def _send_to_dlq(self, message, error: str):
    """Отправляет в Dead Letter Queue"""
 
    await self.dlq_producer.send(
        topic=f"{self.topic}.dlq",
        value={
            "original_message": message.value,
            "error": error,
            "timestamp": datetime.utcnow().isoformat(),
        },
    )

4. Backpressure (перегрузка consumer)

Проблема: Producer пишет быстрее, чем consumer читает → растёт .

Мониторинг lag:

docker compose exec kafka kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group fastapi-app
 
# Вывод:
# TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# orders   0          1000            5000            4000  ← Проблема!

Решение 1: Добавить consumer в группу (горизонтальное масштабирование):

# Запускаем несколько инстансов приложения
# Kafka автоматически распределит партиции между ними

Решение 2: Увеличить количество партиций:

docker compose exec kafka kafka-topics \
  --alter \
  --topic orders \
  --partitions 6 \
  --bootstrap-server localhost:9092

Решение 3: Batch processing:

async def _consume_loop(self):
    """Обрабатываем батчами"""
 
    batch = []
    batch_size = 100
 
    async for message in self.consumer:
        batch.append(message)
 
        if len(batch) >= batch_size:
            await self._process_batch(batch)
            await self.consumer.commit()
            batch = []
 
async def _process_batch(self, messages):
    """Параллельная обработка батча"""
 
    tasks = [self.handler(msg.value) for msg in messages]
    await asyncio.gather(*tasks, return_exceptions=True)

5. Rebalancing (перебалансировка)

Проблема: При добавлении/удалении consumer Kafka перераспределяет партиции → обработка останавливается на несколько секунд ().

Решение: Graceful shutdown:

import signal
 
async def shutdown(signal, loop):
    """Graceful shutdown при SIGTERM"""
 
    logger.info(f"Received exit signal {signal.name}...")
 
    # Останавливаем consumer (завершает текущие сообщения)
    await stop_order_consumer()
 
    # Останавливаем producer
    if _producer_client:
        await _producer_client.stop()
 
    # Останавливаем event loop
    loop.stop()
 
# Регистрируем обработчики
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
    loop.add_signal_handler(
        sig,
        lambda s=sig: asyncio.create_task(shutdown(s, loop))
    )

Тестирование Kafka

Unit тесты (моки)

import pytest
from unittest.mock import AsyncMock, patch
 
@pytest.mark.asyncio
async def test_create_order_sends_event():
    """Проверяем, что API отправляет событие в Kafka"""
 
    # Мокируем producer
    mock_producer = AsyncMock()
 
    with patch('app.routers.orders.get_kafka_producer', return_value=mock_producer):
        from app.routers.orders import create_order
 
        order = OrderCreate(user_id=123, items=["item1"], total_amount=100)
        response = await create_order(order, producer=mock_producer)
 
        # Проверяем вызов
        mock_producer.send.assert_called_once()
        call_args = mock_producer.send.call_args
 
        assert call_args.kwargs['topic'] == 'orders'
        assert call_args.kwargs['value']['user_id'] == 123
        assert response.order_id > 0

Integration тесты (Testcontainers)

import pytest
from testcontainers.kafka import KafkaContainer
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
 
@pytest.fixture(scope="session")
def kafka_container():
    """Запускает Kafka в Docker для тестов"""
 
    with KafkaContainer() as kafka:
        yield kafka
 
@pytest.mark.asyncio
async def test_kafka_producer_consumer(kafka_container):
    """E2E тест: producer → Kafka → consumer"""
 
    bootstrap_servers = kafka_container.get_bootstrap_server()
 
    # Producer
    producer = AIOKafkaProducer(
        bootstrap_servers=bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode(),
    )
    await producer.start()
 
    # Consumer
    consumer = AIOKafkaConsumer(
        'test-topic',
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        value_deserializer=lambda m: json.loads(m.decode()),
    )
    await consumer.start()
 
    # Отправляем событие
    test_message = {'test': 'data'}
    await producer.send_and_wait('test-topic', test_message)
 
    # Читаем событие
    async for message in consumer:
        assert message.value == test_message
        break  # Прочитали одно сообщение → выходим
 
    await producer.stop()
    await consumer.stop()

Локальное тестирование с Kafka UI

  1. Откройте http://localhost:8080 (Kafka UI из Docker Compose)
  2. Выберите топик orders
  3. Вручную отправьте тестовое событие через UI
  4. Проверьте логи consumer в приложении

Мониторинг и наблюдаемость

Ключевые метрики

Producer метрики:

  • record-send-rate — событий в секунду
  • record-error-rate — ошибки отправки
  • request-latency-avg — задержка отправки

Consumer метрики:

  • records-consumed-rate — событий в секунду
  • records-lag-max — максимальный lag (критично!)
  • commit-latency-avg — задержка

Kafka метрики:

  • Disk usage (events хранятся на диске)
  • Network I/O ()
  • (проблемы с репликацией)

Prometheus + Grafana

Добавьте JMX Exporter для экспорта метрик Kafka в Prometheus:

# Добавьте в docker-compose.yml
kafka:
  environment:
    KAFKA_JMX_PORT: 9101
    KAFKA_JMX_HOSTNAME: localhost
  ports:
    - "9101:9101"
 
jmx-exporter:
  image: sscaling/jmx-prometheus-exporter
  ports:
    - "5556:5556"
  environment:
    SERVICE_PORT: 5556
  volumes:
    - ./jmx-exporter-config.yml:/etc/jmx-exporter/config.yml
  command:
    - "5556"
    - "/etc/jmx-exporter/config.yml"

Grafana dashboard: Используйте готовый Kafka Overview dashboard.

Структурное логирование

import structlog
 
logger = structlog.get_logger()
 
async def handle_order_created(event: dict):
    """Обработчик с structured logging"""
 
    logger.info(
        "order.processing.started",
        order_id=event["order_id"],
        user_id=event["user_id"],
        total_amount=event["total_amount"],
    )
 
    try:
        await process_order(event)
 
        logger.info(
            "order.processing.completed",
            order_id=event["order_id"],
            duration_ms=123,
        )
 
    except Exception as e:
        logger.error(
            "order.processing.failed",
            order_id=event["order_id"],
            error=str(e),
            exc_info=True,
        )
        raise

Структурные логи легко парсить в ELK/Loki для аналитики.

Production Checklist

Перед деплоем

  • : минимум 3 брокера, replication factor >= 2
  • : настройте log.retention.hours (default: 168h = 7 дней)
  • Партиции: рассчитайте количество (2-3x количество консьюмеров)
  • Acks: acks=all для критичных данных
  • Idempotence: включите enable.idempotence=True
  • : используйте gzip или snappy
  • Monitoring: настройте алерты на lag и error rate
  • DLQ: реализуйте Dead Letter Queue
  • Graceful shutdown: обрабатывайте SIGTERM
  • Тесты: integration тесты с Testcontainers

Безопасность

# SSL/TLS encryption
kafka:
  environment:
    KAFKA_SECURITY_PROTOCOL: SSL
    KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
    KAFKA_SSL_KEYSTORE_PASSWORD: changeit
 
# SASL Authentication
KAFKA_SASL_MECHANISM: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN

Capacity planning

Формула throughput:

Max Throughput = (Partitions × Consumer Processing Rate)

Пример: 10 партиций, каждый consumer обрабатывает 100 msg/s → max 1000 msg/s.

Формула storage:

Storage = (Events per day × Avg event size × Retention days)

Пример: 10M событий/день × 1KB × 7 дней = 70GB.

Итоги

Вы научились:

  • Понимать архитектуру Kafka — топики, партиции, consumer groups
  • Интегрировать с FastAPI — async producer и consumer
  • Обрабатывать события — идемпотентность, retry, DLQ
  • Избегать подводных камней — порядок, rebalancing, backpressure
  • Тестировать — unit, integration, e2e
  • Мониторить — метрики, логи, алерты

Следующие шаги:

  1. Изучите Kafka Streams для stream processing
  2. Попробуйте Schema Registry (Avro) для версионирования схем
  3. Изучите KSQL для SQL-запросов к событиям
  4. Реализуйте Saga pattern для распределённых транзакций
  5. Настройте multi-region replication для disaster recovery

Полезные ссылки:

Kafka — мощный инструмент для построения масштабируемых distributed систем. Начните с простого use case (async notifications), освойте паттерны, и постепенно переходите к event sourcing и .

Kafka превращает ваш монолит в event-driven систему за неделю, но потребует месяцы на освоение всех нюансов. Начинайте с одного топика и одного consumer — усложняйте постепенно.

Глоссарий

Ключевые термины, используемые в статье:

Идемпотентность
Свойство операции, которую можно выполнить многократно без изменения результата после первого применения. Например: установка значения x=5 идемпотентна, а x=x+5 — нет. В Kafka идемпотентность защищает от дубликатов при retry.
Acknowledgement (ACK)
Подтверждение получения сообщения. Producer может требовать ACK от лидера партиции (acks=1), от всех реплик (acks=all) или не ждать вообще (acks=0).
Apache Avro
Бинарный формат сериализации данных от Apache (назван в честь британского производителя самолётов Avro). В отличие от JSON, Avro компактнее (меньше размер), быстрее (бинарный формат) и поддерживает эволюцию схемы. Схема хранится отдельно в Schema Registry, в событиях передаётся только ID схемы + данные.
Broker
Сервер Kafka, который хранит данные и обрабатывает запросы от producer и consumer. Кластер Kafka состоит из нескольких брокеров для отказоустойчивости и масштабирования.
Commit
Операция сохранения offset в Kafka, которая помечает событие как обработанное. Commit можно делать автоматически (auto.commit) или вручную после успешной обработки для гарантии at-least-once.
Consumer
Приложение (или его часть), которое читает события из Kafka. Consumer подписывается на топики, читает события с определённых партиций и коммитит offset после обработки.
Consumer Group
Логическая группа консьюмеров, которые делят между собой партиции одного топика. Kafka автоматически распределяет партиции между консьюмерами группы. При добавлении/удалении консьюмера происходит rebalancing.
CQRS
Command Query Responsibility Segregation — паттерн разделения моделей для записи (Command) и чтения (Query). Команды изменяют состояние через события, а запросы читают из оптимизированных read-моделей. Kafka идеально подходит для CQRS: события в топиках = команды, consumer строят read-модели в БД/Redis.
Dead Letter Queue (DLQ)
Специальный топик для событий, которые не удалось обработать после всех retry попыток. Позволяет не блокировать обработку здоровых событий и анализировать проблемы отдельно.
Event Sourcing
Паттерн хранения данных, где вместо текущего состояния сохраняется последовательность событий. Состояние восстанавливается проигрыванием всех событий. Даёт полную историю изменений и возможность replay.
Lag
Отставание consumer от последнего события в партиции. Измеряется в количестве необработанных событий. Высокий lag (тысячи событий) означает, что consumer не успевает обрабатывать поток данных.
Latency
Задержка между отправкой запроса и получением ответа. Измеряется в миллисекундах (ms). В высоконагруженных системах важны процентили: p50, p95, p99.
Offset
Уникальный порядковый номер события внутри партиции. Начинается с 0 и увеличивается при каждом новом событии. Consumer хранит свой offset чтобы знать, до какого места он прочитал топик.
Persistence
Сохранение данных на диск для долговременного хранения. В Kafka события записываются на диск брокера и могут храниться днями/неделями, в отличие от in-memory очередей.
Producer
Приложение (или его часть), которое отправляет события в Kafka. Producer выбирает топик и опционально ключ партиции, затем сериализует данные и отправляет брокеру.
Rebalancing
Процесс перераспределения партиций между консьюмерами в Consumer Group. Происходит при добавлении/удалении консьюмера или изменении количества партиций. Во время rebalancing обработка событий приостанавливается.
Replay
Возможность перечитать события из прошлого, 'перемотав' offset назад. Полезно для отладки, восстановления данных или создания новых проекций из исторических событий.
Replication Factor
Количество копий каждой партиции на разных брокерах. При replication factor=3 данные хранятся на трёх брокерах. Защищает от потери данных при падении брокера. Минимум 2 для production, рекомендуется 3.
Retention
Время хранения событий в Kafka. По умолчанию 7 дней (168 часов). Старые события автоматически удаляются. Можно настроить по времени (log.retention.hours) или размеру (log.retention.bytes).
Throughput
Пропускная способность системы — количество событий, обрабатываемых в единицу времени. Измеряется в RPS (requests per second) или events/sec. В Kafka зависит от количества партиций и производительности consumer.
Under-replicated partitions
Партиции, у которых не все реплики синхронизированы. Означает проблемы с брокерами или сетью. Критическая метрика для мониторинга — высокое значение грозит потерей данных при падении лидера.
ZooKeeper / KRaft
Система координации кластера Kafka. ZooKeeper отвечал за выбор лидера, хранение метаданных и координацию брокеров. С Kafka 3.0+ используется KRaft — встроенный механизм без внешних зависимостей.