Когда REST API уже не справляется
Представьте: ваш сервис обрабатывает заказы. При создании заказа нужно отправить email, обновить склад, начислить бонусы, уведомить курьерскую службу. Первая итерация — синхронные вызовы:
@app.post("/orders")
async def create_order(order: OrderCreate):
db_order = await save_order(order)
await send_email(order.user_email) # 500ms latency
await update_inventory(order.items) # 300ms latency
await calculate_bonuses(order.user_id) # 200ms latency
await notify_delivery(order.id) # 400ms latency
return db_order # Пользователь ждёт 1.4 секундыПроблемы:
- Пользователь ждёт завершения всех операций — 1.4 секунды вместо 50ms
- Если email-сервис упал → весь запрос падает
- Retry логика усложняет код
- Нагрузка растёт → растёт пропорционально
Второй подход — фоновые задачи через Celery. Лучше, но:
- Celery + Redis/RabbitMQ — ещё одна инфраструктура
- Нет гарантий доставки при перезапуске
- Сложно отследить цепочку событий
- Масштабирование воркеров требует ручной настройки
Kafka решает эти проблемы архитектурно:
- Асинхронность из коробки — API отвечает мгновенно
- Гарантии доставки через и
- Горизонтальное масштабирование через партиции
- Распределённая обработка с consumer groups
- Event sourcing и событий для отладки
Kafka превращает монолит в event-driven систему, где сервисы общаются через события, а не прямые вызовы. Это не просто очередь — это распределённая база данных событий.
Apache Kafka изначально разработан в LinkedIn (2011) для обработки логов активности пользователей. Сейчас это стандарт индустрии для event streaming: Netflix, Uber, Airbnb используют Kafka для миллионов событий в секунду.
Что такое Kafka простыми словами
Аналогия: YouTube для событий
Kafka — это как YouTube, но для событий в вашей системе:
- Producer (продюсер) — как канал, который публикует видео (события)
- Topic (топик) — как плейлист (категория событий: "orders", "notifications")
- Partition (партиция) — представьте, что плейлист из 1000 видео разделили на 3 папки по ~333 видео. Теперь 3 человека могут независимо смотреть свою папку параллельно — не мешая друг другу
- Consumer (консьюмер) — как подписчик, который смотрит видео (читает события)
- Consumer Group (группа консьюмеров) — команда зрителей, где каждый берёт одну папку (партицию): первый смотрит папку 1, второй — папку 2, третий — папку 3. Работа делится автоматически
Ключевое отличие от классических очередей (RabbitMQ, SQS):
| RabbitMQ/SQS (очередь по умолчанию) | Kafka |
|---|---|
| Сообщение прочитано → удалено | События хранятся навсегда (или по TTL) |
| Сообщение уходит к одному консьюмеру | Много консьюмеров читают одни и те же события |
| Нет истории после прочтения | Можно "перемотать назад" (replay) |
| Брокер распределяет сообщения | Консьюмер сам выбирает, с какого места читать |
Ключевые концепции
Topic (топик) — логическая категория событий. Примеры: orders.created, users.registered, payments.completed.
Partition (партиция) — физическое разделение топика для параллелизма. Топик orders с 3 партициями позволяет 3 консьюмерам читать одновременно.
— уникальный номер события внутри партиции. Как закладка в книге: offset 0 = первое событие, offset 100 = сотое событие. Consumer запоминает свой offset (например, "я прочитал до 150-го события") и может продолжить с этого места после перезапуска, или "перемотать назад" к offset 50.
— приложение, которое отправляет события в Kafka.
— приложение, которое читает события из Kafka.
— группа консьюмеров, которые делят между собой партиции. Kafka гарантирует, что одна партиция читается только одним консьюмером из группы.
— сервер Kafka. Кластер Kafka = несколько брокеров для отказоустойчивости.
— координатор кластера. В Kafka 3.0+ KRaft заменяет ZooKeeper.
Гарантии доставки
Kafka предлагает три уровня гарантий:
1. At most once (максимум один раз) — можем потерять:
# Producer отправил, но не дождался подтверждения
producer.send('topic', value, acks=0) # Fire and forget2. At least once (минимум один раз) — можем продублировать:
# Producer ждёт подтверждения, но может повторить при таймауте
producer.send('topic', value, acks=1) # Leader подтвердил3. Exactly once (ровно один раз) — :
# Идемпотентный producer + транзакции
producer = KafkaProducer(
enable_idempotence=True,
transactional_id='my-transactional-id'
)Для большинства бизнес-кейсов достаточно at least once + идемпотентная обработка на стороне консьюмера.
Зачем Kafka нужен Python-разработчику
Типичные сценарии
1. Асинхронная обработка (замена Celery):
Проблема: API должен отвечать быстро, но нужно выполнить медленные операции (email, интеграции).
# ❌ Плохо: пользователь ждёт завершения всех операций
@app.post("/users/register")
async def register(user: UserCreate):
db_user = await create_user(user) # 50ms
await send_welcome_email(user.email) # 500ms — ждём SMTP
await create_bonus_account(db_user.id) # 200ms — запрос в другую БД
return db_user # Пользователь ждал 750ms вместо 50msС Kafka API отвечает мгновенно, а фоновый consumer обрабатывает:
# ✅ Хорошо: API отвечает быстро, обработка в фоне
@app.post("/users/register")
async def register(user: UserCreate):
db_user = await create_user(user) # 50ms
# Отправили событие в Kafka — занимает ~5ms
await producer.send('users.registered', {
'user_id': db_user.id,
'email': user.email,
'timestamp': datetime.utcnow().isoformat()
})
return db_user # Ответ за 55ms вместо 750ms
# Отдельный consumer (можно в том же приложении или отдельном)
async def handle_user_registered(event):
# Обрабатываем в фоне, не блокируя API
await send_welcome_email(event['email'])
await create_bonus_account(event['user_id'])
# Если упадёт — повторим автоматически (at-least-once)Преимущества над Celery:
- Kafka хранит события на диске → не потеряем при перезапуске
- Replay событий для отладки или пересоздания состояния
- Несколько consumer groups могут читать одни события
- Проще масштабирование (добавили партиции → добавили consumers)
2. Микросервисная коммуникация (отказ от прямых HTTP вызовов):
Раньше сервис Order напрямую вызывал Inventory и Notification через HTTP:
# ❌ Плохо: Order Service жёстко связан с другими сервисами
@app.post("/orders")
async def create_order(order: OrderCreate):
db_order = await save_order(order)
# Прямой HTTP вызов → если Inventory упал, заказ не создаётся
await http_client.post("http://inventory-service/reserve", {...})
await http_client.post("http://notification-service/send", {...})
return db_orderС Kafka сервисы не знают друг о друге — они реагируют на события:
# ✅ Хорошо: Order Service публикует событие и забывает
@app.post("/orders")
async def create_order(order: OrderCreate):
db_order = await save_order(order)
# Отправили событие → ответили пользователю мгновенно
await producer.send('orders.created', {
'order_id': db_order.id,
'user_id': order.user_id,
'items': order.items,
'total': order.total
})
return db_order # Не ждём Inventory и Notification
# Inventory Service — отдельное приложение, которое слушает события
async def handle_order_created(event):
await reserve_items(event['items'])
# Если упадёт — не повлияет на создание заказа
# Notification Service — ещё одно отдельное приложение
async def handle_order_created(event):
await send_order_confirmation(event['order_id'])
# Можно добавить новый сервис, не меняя Order ServiceПреимущества:
- Order Service не ломается, если Inventory недоступен
- Можно добавить Analytics Service без изменения Order Service
- Каждый сервис масштабируется независимо
- События хранятся → можно восстановить состояние при сбое
3. :
Проблема: в обычной БД видим только текущее состояние, теряем историю изменений.
# ❌ Традиционный подход: UPDATE затирает историю
@app.post("/transfer")
async def transfer_money(from_account: str, to_account: str, amount: float):
# Было: 5000 руб
await db.execute(
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_account)
)
# Стало: 4000 руб
# Потеряли информацию: кто, когда, зачем списал 1000 рубEvent Sourcing: вместо изменения состояния пишем факты (события):
# ✅ Event Sourcing: храним все события
@app.post("/transfer")
async def transfer_money(from_account: str, to_account: str, amount: float):
transfer_id = generate_uuid()
# Событие 1: Списание
await producer.send('accounts.debited', {
'transfer_id': transfer_id,
'account_id': from_account,
'amount': -1000,
'timestamp': datetime.utcnow().isoformat(),
'description': f'Перевод на {to_account}'
})
# Событие 2: Зачисление
await producer.send('accounts.credited', {
'transfer_id': transfer_id,
'account_id': to_account,
'amount': 1000,
'timestamp': datetime.utcnow().isoformat(),
'description': f'Перевод от {from_account}'
})
# Consumer восстанавливает текущее состояние из событий
async def rebuild_balance(account_id: str) -> float:
events = await get_account_events(account_id) # Читаем из Kafka
balance = 0
for event in events:
if event['type'] == 'debited':
balance -= event['amount']
elif event['type'] == 'credited':
balance += event['amount']
return balanceПреимущества:
- Полная история: можем ответить "почему баланс такой?"
- Аудит из коробки: кто, когда, сколько
- Replay: пересоздать состояние на любую дату
- Отладка: воспроизвести ошибку из событий
- Compliance: для финтеха и банков это требование
4. Real-time аналитика:
Проблема: нужно считать метрики в реальном времени (популярные страницы, активные пользователи), но запросы к БД медленные.
# ❌ Традиционный подход: пишем каждый клик в БД
@app.get("/products/{item_id}")
async def get_product(item_id: int, user_id: int):
# Записываем клик в PostgreSQL
await db.execute(
"INSERT INTO page_views (user_id, page, timestamp) VALUES (?, ?, ?)",
(user_id, f'/products/{item_id}', datetime.utcnow())
)
# При 1000 RPS это 1000 INSERT/s в БД — узкое горло
# Получаем популярные товары — медленный запрос
popular = await db.execute(
"SELECT page, COUNT(*) FROM page_views WHERE timestamp > now() - interval '1 hour' GROUP BY page"
)
# На больших объёмах это секунды ожиданияС Kafka можно агрегировать метрики в памяти consumer:
# ✅ Хорошо: пишем события в Kafka, агрегируем в памяти
@app.get("/products/{item_id}")
async def get_product(item_id: int, user_id: int):
# Просто отправили событие — занимает 1-5ms
await producer.send('user.clicks', {
'user_id': user_id,
'page': f'/products/{item_id}',
'timestamp': time.time(),
'session_id': request.cookies.get('session_id')
})
# Читаем популярные товары из Redis (обновляется consumer)
popular = await redis.get('popular_products_1h') # < 1ms
return popular
# Consumer агрегирует клики в реальном времени
page_counters = defaultdict(int) # В памяти
async def process_clicks(event):
page = event['page']
# Инкрементируем счётчик в памяти
page_counters[page] += 1
# Раз в 10 секунд сбрасываем в Redis
if should_flush():
top_pages = sorted(page_counters.items(), key=lambda x: x[1], reverse=True)[:10]
await redis.set('popular_products_1h', json.dumps(top_pages))
# Трекаем путь пользователя для рекомендаций
await track_user_journey(event['user_id'], page)Преимущества:
- Разгрузили БД: вместо 1000 INSERT/s → запись в Kafka
- Real-time: счётчики обновляются каждые 10 секунд, а не раз в час
- Масштабирование: добавили партиций → добавили consumers
- Несколько consumer groups: один для аналитики, другой для рекомендаций
Когда Kafka избыточен
НЕ используйте Kafka, если:
- У вас монолит без планов на микросервисы
- Нагрузка < 100 событий в секунду (хватит Redis/RabbitMQ)
- Нужны транзакции между сервисами (используйте Saga pattern)
- Команда не готова поддерживать ещё одну инфраструктуру
- Критична строгая очерёдность событий (Kafka гарантирует порядок только внутри партиции)
Используйте Kafka, если:
- Нужна горизонтальная масштабируемость
- Критична отказоустойчивость и durability
- Есть множество консьюмеров одних событий
- Нужен replay (пересмотр истории событий)
- Планируете event sourcing или
Установка и запуск Kafka
Docker Compose (рекомендуемый способ)
Создайте docker-compose.yml:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181Запуск:
docker compose up -d
# Проверка статуса
docker compose ps
# Логи
docker compose logs -f kafka
# UI для мониторинга: http://localhost:8080В production используйте минимум 3 брокера для отказоустойчивости. Docker Compose подходит только для dev/testing.
Создание топика
Топик — это логическая категория событий в Kafka. Перед началом работы нужно создать топики, в которые будем писать события.
Создадим топик orders с 3 партициями:
# Через docker exec
docker compose exec kafka kafka-topics \
--create \
--topic orders \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1Что означают параметры:
--topic orders— имя топика (категория событий)--partitions 3— количество партиций для параллельной обработки. 3 партиции = до 3 консьюмеров могут читать одновременно--replication-factor 1— количество копий данных (1 = без резервирования). В production используйте минимум 2-3
Проверим, что топик создан:
# Просмотр всех топиков
docker compose exec kafka kafka-topics \
--list \
--bootstrap-server localhost:9092
# Детали топика: партиции, реплики, конфигурация
docker compose exec kafka kafka-topics \
--describe \
--topic orders \
--bootstrap-server localhost:9092
# Вывод:
# Topic: orders PartitionCount: 3 ReplicationFactor: 1
# Partition: 0 Leader: 1 Replicas: 1 Isr: 1
# Partition: 1 Leader: 1 Replicas: 1 Isr: 1
# Partition: 2 Leader: 1 Replicas: 1 Isr: 1Топики можно создавать автоматически при первой отправке события, если
включена опция auto.create.topics.enable=true. Но в production лучше
создавать топики явно с нужными параметрами.
Python клиенты для Kafka
Сравнение библиотек
| Библиотека | Async | Typed | Production Ready | Когда использовать |
|---|---|---|---|---|
| aiokafka | ✅ | ⚠️ | ✅ | FastAPI, async Python |
| confluent-kafka-python | ❌ | ✅ | ✅ | Sync код, максимальная производительность |
| kafka-python | ❌ | ❌ | ⚠️ | Legacy проекты (не активно поддерживается) |
| faust | ✅ | ✅ | ✅ | Stream processing (альтернатива Kafka Streams) |
Для FastAPI выбираем aiokafka — нативная поддержка async/await и интеграция с asyncio event loop.
Установка aiokafka
pip install aiokafka
# Для сериализации JSON
pip install aiokafka orjson
# Для Avro схем (optional, для production)
pip install aiokafka fastavroЧто такое ?
В примерах статьи мы используем JSON для простоты:
await producer.send('orders', {'order_id': 123, 'total': 5000})Проблемы JSON в production:
- ❌ Много лишних байт:
{"order_id":123}— имена полей в каждом событии - ❌ Нет контроля схемы: можно отправить
{"order_id": "abc"}(строка вместо числа) - ❌ Сложная эволюция: добавили поле → старые consumer ломаются
Avro решает эти проблемы:
- ✅ Компактный бинарный формат:
{"order_id":123}(JSON 17 байт) → 5 байт (Avro) - ✅ Строгая типизация: schema валидирует данные перед отправкой
- ✅ Эволюция схемы: можно добавлять поля с default значениями
Как работает Avro:
- Определяем схему (один раз):
{
"type": "record",
"name": "Order",
"fields": [
{ "name": "order_id", "type": "int" },
{ "name": "total", "type": "float" },
{ "name": "status", "type": "string", "default": "pending" }
]
}- Отправляем события (схема не передаётся, только ID + данные):
from fastavro import schemaless_writer
# Вместо 50 байт JSON → 10 байт Avro
await producer.send('orders', avro_serialize(order_data, schema))- Schema Registry хранит версии схем:
- v1:
order_id,total - v2:
order_id,total,status(новое поле с default) - Старые consumers читают v1 и v2 без проблем
- v1:
Когда использовать Avro:
- Production системы с большим объёмом данных (экономия трафика)
- Множество команд работают с одним топиком (контракт схемы)
- Долгоживущие системы (эволюция без breaking changes)
Для обучения и MVP достаточно JSON. Переход на Avro — это оптимизация на этапе масштабирования.
Интеграция Kafka с FastAPI
Архитектура приложения
kafka-fastapi/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── config.py # Настройки Kafka
│ ├── kafka/
│ │ ├── __init__.py
│ │ ├── producer.py # Kafka Producer
│ │ ├── consumer.py # Kafka Consumer
│ │ └── topics.py # Топики и схемы
│ ├── models.py # Pydantic схемы
│ ├── routers/
│ │ └── orders.py # API endpoints
│ └── consumers/
│ └── order_handler.py # Обработчики событий
├── tests/
│ ├── conftest.py
│ └── test_kafka.py
├── docker-compose.yml
├── requirements.txt
└── README.md
Настройки (app/config.py)
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
# Kafka
kafka_bootstrap_servers: list[str] = ["localhost:9092"]
kafka_consumer_group: str = "fastapi-app"
kafka_auto_offset_reset: str = "earliest" # "earliest" или "latest"
# Topics
orders_topic: str = "orders"
notifications_topic: str = "notifications"
# App
app_name: str = "Kafka FastAPI"
debug: bool = False
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
settings = Settings()Что такое auto_offset_reset и когда что использовать?
Параметр auto_offset_reset определяет, с какого места читать топик, если для consumer group нет сохранённого offset (первый запуск или offset устарел).
earliest — читать с самого начала топика:
kafka_auto_offset_reset: str = "earliest"Когда использовать:
- ✅ Первый запуск consumer — нужно обработать все исторические события
- ✅ Event Sourcing — восстанавливаем состояние из всех событий
- ✅ Аналитика — считаем метрики за всё время
- ✅ Dev/Testing — хотим видеть все тестовые события
Плюсы:
- Не потеряем ни одного события
- Можно восстановить состояние с нуля
Минусы:
- При первом запуске обработает ВСЕ события в топике (может быть миллионы)
- Долгий старт, если топик большой
latest — читать только новые события:
kafka_auto_offset_reset: str = "latest"Когда использовать:
- ✅ Production — не нужна история, только текущие события
- ✅ Уведомления — отправляем только новые, старые неактуальны
- ✅ Real-time мониторинг — интересны только свежие метрики
- ✅ После долгого downtime — не хотим обрабатывать накопившиеся события
Плюсы:
- Быстрый старт — читаем только с момента подключения
- Не перегружаем систему при первом запуске
Минусы:
- Потеряем все события, которые были до запуска consumer
- Не подходит для Event Sourcing
Практические примеры:
# Сервис уведомлений — только новые события
kafka_auto_offset_reset: str = "latest"
# Запустили → получаем только свежие заказы → отправляем email
# Сервис аналитики — вся история
kafka_auto_offset_reset: str = "earliest"
# Запустили → обработали все клики за месяц → построили отчёт
# Сервис обработки заказов — зависит от ситуации
kafka_auto_offset_reset: str = "earliest" # Dev: обработаем тестовые заказы
kafka_auto_offset_reset: str = "latest" # Prod: только новые после деплояВажно: После первого запуска и commit offset параметр auto_offset_reset больше не влияет — consumer продолжает с сохранённого offset. Параметр работает только когда offset отсутствует.
Проверить текущий offset:
docker compose exec kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group fastapi-appKafka Producer (app/kafka/producer.py)
import json
import logging
from typing import Any, Optional
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
from app.config import settings
logger = logging.getLogger(__name__)
class KafkaProducerClient:
"""
Singleton Kafka Producer для отправки событий.
Использование:
producer = await get_kafka_producer()
await producer.send('orders', {'order_id': 123})
"""
def __init__(self):
self.producer: Optional[AIOKafkaProducer] = None
async def start(self):
"""Создаёт и запускает producer"""
self.producer = AIOKafkaProducer(
bootstrap_servers=settings.kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# Гарантии доставки
acks='all', # Ждём подтверждения от всех реплик
retries=3, # Повторные попытки
# Производительность
compression_type='gzip',
max_batch_size=16384,
linger_ms=10, # Ждём 10ms для батчинга
)
await self.producer.start()
logger.info("Kafka Producer started")
async def stop(self):
"""Останавливает producer"""
if self.producer:
await self.producer.stop()
logger.info("Kafka Producer stopped")
async def send(
self,
topic: str,
value: dict[str, Any],
key: Optional[str] = None,
) -> None:
"""
Отправляет событие в Kafka.
Args:
topic: Имя топика
value: Payload события (будет сериализован в JSON)
key: Ключ для партиционирования (optional)
"""
if not self.producer:
raise RuntimeError("Producer not started")
try:
# Ключ определяет партицию (события с одним ключом → одна партиция)
key_bytes = key.encode('utf-8') if key else None
await self.producer.send_and_wait(
topic=topic,
value=value,
key=key_bytes,
)
logger.debug(f"Sent to {topic}: {value}")
except KafkaError as e:
logger.error(f"Failed to send to {topic}: {e}")
raise
# Singleton instance
_producer_client: Optional[KafkaProducerClient] = None
async def get_kafka_producer() -> KafkaProducerClient:
"""Dependency injection для FastAPI"""
global _producer_client
if _producer_client is None:
_producer_client = KafkaProducerClient()
await _producer_client.start()
return _producer_clientПотокобезопасность Producer: почему синглетон безопасен в asyncio
Вопрос: Singleton producer + параллельные запросы = проблемы с многопоточностью?
Ответ: В asyncio это безопасно, но есть нюансы:
✅ Безопасно в asyncio (FastAPI по умолчанию):
FastAPI работает в одном event loop, все запросы обрабатываются в одном потоке через корутины:
# Все эти запросы выполняются в одном потоке
async def endpoint1(): await producer.send(...) # Корутина 1
async def endpoint2(): await producer.send(...) # Корутина 2
async def endpoint3(): await producer.send(...) # Корутина 3
# asyncio переключается между корутинами, но всё в одном потокеПочему это работает:
aiokafka.AIOKafkaProducerнаписан для asyncio- Использует
awaitдля переключения корутин, а не блокирующие операции - Один producer обслуживает тысячи параллельных
send()без проблем
❌ НЕ безопасно при реальной многопоточности:
Если запускаете FastAPI с несколькими workers через uvicorn:
# ❌ Плохо: 4 процесса = 4 event loop = race condition на синглетоне
uvicorn app.main:app --workers 4Каждый worker — это отдельный процесс с собственным синглетоном, поэтому проблем нет. Но если используете threading внутри приложения:
# ❌ ОПАСНО: threading + общий producer
import threading
def send_in_thread():
# aiokafka НЕ thread-safe!
producer.send(...) # Race condition
thread = threading.Thread(target=send_in_thread)
thread.start()Решение для threading:
Используйте confluent-kafka-python (синхронный, thread-safe):
from confluent_kafka import Producer
# Thread-safe producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def send_in_thread():
producer.produce('topic', value=b'data')
producer.flush() # Дождаться отправкиИли создавайте producer в каждом потоке:
async def send_in_thread():
# Каждый поток создаёт свой producer
local_producer = AIOKafkaProducer(...)
await local_producer.start()
await local_producer.send(...)
await local_producer.stop()Практические рекомендации:
- FastAPI + uvicorn (один worker) → синглетон producer безопасен ✅
- FastAPI + uvicorn (несколько workers) → каждый worker имеет свой синглетон ✅
- FastAPI + threading/multiprocessing внутри → используйте
confluent-kafkaили локальные producers ⚠️ - Celery workers + Kafka → каждый worker создаёт свой producer при старте ✅
Проверка потокобезопасности:
import asyncio
async def test_concurrent_sends():
"""Тест параллельной отправки"""
producer = await get_kafka_producer()
# 1000 параллельных отправок
tasks = [
producer.send('test', {'id': i})
for i in range(1000)
]
await asyncio.gather(*tasks)
print("✅ Все 1000 событий отправлены без ошибок")Итог: Для FastAPI + aiokafka синглетон producer — правильное решение. Проблемы возникают только при смешивании asyncio с threading, что редко нужно в реальных приложениях.
Kafka Consumer (app/kafka/consumer.py)
import asyncio
import json
import logging
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer
from aiokafka.errors import KafkaError
from app.config import settings
logger = logging.getLogger(__name__)
class KafkaConsumerClient:
"""
Kafka Consumer для чтения событий.
Использование:
consumer = KafkaConsumerClient('orders', handler_func)
await consumer.start()
"""
def __init__(
self,
topic: str,
handler: Callable[[dict], Awaitable[None]],
group_id: Optional[str] = None,
):
self.topic = topic
self.handler = handler
self.group_id = group_id or settings.kafka_consumer_group
self.consumer: Optional[AIOKafkaConsumer] = None
self._running = False
async def start(self):
"""Создаёт consumer и начинает обработку"""
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=settings.kafka_bootstrap_servers,
group_id=self.group_id,
auto_offset_reset=settings.kafka_auto_offset_reset,
enable_auto_commit=False, # Ручной commit для надёжности
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
await self.consumer.start()
logger.info(f"Consumer started for topic: {self.topic}")
self._running = True
asyncio.create_task(self._consume_loop())
async def stop(self):
"""Останавливает consumer"""
self._running = False
if self.consumer:
await self.consumer.stop()
logger.info(f"Consumer stopped for topic: {self.topic}")
async def _consume_loop(self):
"""Основной цикл чтения событий"""
try:
async for message in self.consumer:
try:
# Обрабатываем событие
await self.handler(message.value)
# Commit offset после успешной обработки
await self.consumer.commit()
logger.debug(
f"Processed message from {self.topic}: "
f"partition={message.partition}, offset={message.offset}"
)
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
# Не делаем commit → сообщение будет перечитано
except KafkaError as e:
logger.error(f"Kafka error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)FastAPI Application (app/main.py)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.config import settings
from app.kafka.producer import get_kafka_producer, _producer_client
from app.consumers.order_handler import start_order_consumer, stop_order_consumer
from app.routers import orders
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle: startup and shutdown"""
# Startup: запускаем Kafka Producer
await get_kafka_producer()
# Startup: запускаем Kafka Consumers
await start_order_consumer()
yield
# Shutdown: останавливаем Producer и Consumers
if _producer_client:
await _producer_client.stop()
await stop_order_consumer()
app = FastAPI(
title=settings.app_name,
lifespan=lifespan,
)
# Подключаем роутеры
app.include_router(orders.router)
@app.get("/health")
async def health():
"""Health check"""
return {"status": "healthy"}API Endpoints (app/routers/orders.py)
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from app.kafka.producer import KafkaProducerClient, get_kafka_producer
from app.config import settings
router = APIRouter(prefix="/orders", tags=["orders"])
class OrderCreate(BaseModel):
"""Схема создания заказа"""
user_id: int
items: list[str] = Field(..., min_items=1)
total_amount: float = Field(..., gt=0)
class OrderResponse(BaseModel):
"""Ответ API"""
order_id: int
status: str = "pending"
message: str = "Order is being processed"
@router.post("/", response_model=OrderResponse, status_code=status.HTTP_202_ACCEPTED)
async def create_order(
order: OrderCreate,
producer: KafkaProducerClient = Depends(get_kafka_producer),
):
"""
Создаёт заказ и отправляет событие в Kafka.
Возвращает 202 Accepted — обработка асинхронная.
"""
# Генерируем ID (в реальности — из БД)
order_id = 12345 # Заглушка
# Отправляем событие в Kafka
await producer.send(
topic=settings.orders_topic,
value={
"event_type": "order.created",
"order_id": order_id,
"user_id": order.user_id,
"items": order.items,
"total_amount": order.total_amount,
},
key=str(order.user_id), # Партиция по user_id
)
return OrderResponse(order_id=order_id)Event Handler (app/consumers/order_handler.py)
import logging
from app.kafka.consumer import KafkaConsumerClient
from app.config import settings
logger = logging.getLogger(__name__)
# Глобальная переменная для consumer
_order_consumer: Optional[KafkaConsumerClient] = None
async def handle_order_created(event: dict):
"""
Обработчик события order.created.
Здесь выполняются все side-effects:
- Отправка email
- Обновление склада
- Начисление бонусов
"""
order_id = event["order_id"]
user_id = event["user_id"]
logger.info(f"Processing order {order_id} for user {user_id}")
try:
# Эмуляция работы
await send_order_email(user_id, order_id)
await update_inventory(event["items"])
await calculate_bonuses(user_id, event["total_amount"])
logger.info(f"Order {order_id} processed successfully")
except Exception as e:
logger.error(f"Failed to process order {order_id}: {e}")
raise # Retry через Kafka
async def send_order_email(user_id: int, order_id: int):
"""Заглушка отправки email"""
logger.info(f"Email sent to user {user_id} for order {order_id}")
async def update_inventory(items: list[str]):
"""Заглушка обновления склада"""
logger.info(f"Inventory updated for items: {items}")
async def calculate_bonuses(user_id: int, amount: float):
"""Заглушка начисления бонусов"""
bonuses = amount * 0.05
logger.info(f"Bonuses {bonuses} calculated for user {user_id}")
async def start_order_consumer():
"""Запускает consumer для обработки заказов"""
global _order_consumer
_order_consumer = KafkaConsumerClient(
topic=settings.orders_topic,
handler=handle_order_created,
)
await _order_consumer.start()
async def stop_order_consumer():
"""Останавливает consumer"""
if _order_consumer:
await _order_consumer.stop()Запуск приложения
# Запускаем Kafka
docker compose up -d
# Запускаем FastAPI
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Тестируем API
curl -X POST http://localhost:8000/orders/ \
-H "Content-Type: application/json" \
-d '{
"user_id": 123,
"items": ["item1", "item2"],
"total_amount": 5000
}'
# Смотрим логи consumer
# Должны увидеть: "Processing order 12345 for user 123"Подводные камни и решения
1. Порядок обработки событий
Проблема: События из разных партиций обрабатываются параллельно.
# User 123: два события в разных партициях
await producer.send('orders', {'user_id': 123, 'action': 'create'}) # → partition 0
await producer.send('orders', {'user_id': 123, 'action': 'cancel'}) # → partition 2
# Consumer может обработать 'cancel' раньше 'create'!Решение: Используйте ключ партиции для группировки связанных событий:
# Все события user_id=123 попадут в одну партицию
await producer.send(
topic='orders',
value={'user_id': 123, 'action': 'create'},
key=str(123) # Ключ определяет партицию
)Kafka гарантирует порядок внутри партиции, но не между партициями.
2. Идемпотентность обработки
Проблема: At-least-once delivery → событие может обработаться дважды.
async def handle_payment(event):
# ❌ Плохо: повторная обработка = двойное списание
await charge_user(event['user_id'], event['amount'])Решение: Делайте обработку идемпотентной:
async def handle_payment(event):
payment_id = event['payment_id']
# Проверяем, обработано ли уже
if await is_payment_processed(payment_id):
logger.info(f"Payment {payment_id} already processed, skipping")
return
# Обрабатываем
await charge_user(event['user_id'], event['amount'])
# Сохраняем факт обработки
await mark_payment_processed(payment_id)3. Dead Letter Queue (DLQ)
Проблема: Событие не обрабатывается (ошибка валидации, bug) → блокирует партицию.
Решение: Отправляйте проблемные события в :
async def _consume_loop(self):
"""Цикл с обработкой ошибок"""
async for message in self.consumer:
try:
await self.handler(message.value)
await self.consumer.commit()
except ValidationError as e:
# Невалидное событие → в DLQ
logger.error(f"Validation error: {e}")
await self._send_to_dlq(message, error=str(e))
await self.consumer.commit() # Пропускаем
except Exception as e:
# Неожиданная ошибка → retry
logger.error(f"Processing error: {e}")
# Не делаем commit → Kafka повторит
async def _send_to_dlq(self, message, error: str):
"""Отправляет в Dead Letter Queue"""
await self.dlq_producer.send(
topic=f"{self.topic}.dlq",
value={
"original_message": message.value,
"error": error,
"timestamp": datetime.utcnow().isoformat(),
},
)4. Backpressure (перегрузка consumer)
Проблема: Producer пишет быстрее, чем consumer читает → растёт .
Мониторинг lag:
docker compose exec kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group fastapi-app
# Вывод:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 1000 5000 4000 ← Проблема!Решение 1: Добавить consumer в группу (горизонтальное масштабирование):
# Запускаем несколько инстансов приложения
# Kafka автоматически распределит партиции между нимиРешение 2: Увеличить количество партиций:
docker compose exec kafka kafka-topics \
--alter \
--topic orders \
--partitions 6 \
--bootstrap-server localhost:9092Решение 3: Batch processing:
async def _consume_loop(self):
"""Обрабатываем батчами"""
batch = []
batch_size = 100
async for message in self.consumer:
batch.append(message)
if len(batch) >= batch_size:
await self._process_batch(batch)
await self.consumer.commit()
batch = []
async def _process_batch(self, messages):
"""Параллельная обработка батча"""
tasks = [self.handler(msg.value) for msg in messages]
await asyncio.gather(*tasks, return_exceptions=True)5. Rebalancing (перебалансировка)
Проблема: При добавлении/удалении consumer Kafka перераспределяет партиции → обработка останавливается на несколько секунд ().
Решение: Graceful shutdown:
import signal
async def shutdown(signal, loop):
"""Graceful shutdown при SIGTERM"""
logger.info(f"Received exit signal {signal.name}...")
# Останавливаем consumer (завершает текущие сообщения)
await stop_order_consumer()
# Останавливаем producer
if _producer_client:
await _producer_client.stop()
# Останавливаем event loop
loop.stop()
# Регистрируем обработчики
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(shutdown(s, loop))
)Тестирование Kafka
Unit тесты (моки)
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_create_order_sends_event():
"""Проверяем, что API отправляет событие в Kafka"""
# Мокируем producer
mock_producer = AsyncMock()
with patch('app.routers.orders.get_kafka_producer', return_value=mock_producer):
from app.routers.orders import create_order
order = OrderCreate(user_id=123, items=["item1"], total_amount=100)
response = await create_order(order, producer=mock_producer)
# Проверяем вызов
mock_producer.send.assert_called_once()
call_args = mock_producer.send.call_args
assert call_args.kwargs['topic'] == 'orders'
assert call_args.kwargs['value']['user_id'] == 123
assert response.order_id > 0Integration тесты (Testcontainers)
import pytest
from testcontainers.kafka import KafkaContainer
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
@pytest.fixture(scope="session")
def kafka_container():
"""Запускает Kafka в Docker для тестов"""
with KafkaContainer() as kafka:
yield kafka
@pytest.mark.asyncio
async def test_kafka_producer_consumer(kafka_container):
"""E2E тест: producer → Kafka → consumer"""
bootstrap_servers = kafka_container.get_bootstrap_server()
# Producer
producer = AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode(),
)
await producer.start()
# Consumer
consumer = AIOKafkaConsumer(
'test-topic',
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode()),
)
await consumer.start()
# Отправляем событие
test_message = {'test': 'data'}
await producer.send_and_wait('test-topic', test_message)
# Читаем событие
async for message in consumer:
assert message.value == test_message
break # Прочитали одно сообщение → выходим
await producer.stop()
await consumer.stop()Локальное тестирование с Kafka UI
- Откройте http://localhost:8080 (Kafka UI из Docker Compose)
- Выберите топик
orders - Вручную отправьте тестовое событие через UI
- Проверьте логи consumer в приложении
Мониторинг и наблюдаемость
Ключевые метрики
Producer метрики:
record-send-rate— событий в секундуrecord-error-rate— ошибки отправкиrequest-latency-avg— задержка отправки
Consumer метрики:
records-consumed-rate— событий в секундуrecords-lag-max— максимальный lag (критично!)commit-latency-avg— задержка
Kafka метрики:
- Disk usage (events хранятся на диске)
- Network I/O ()
- (проблемы с репликацией)
Prometheus + Grafana
Добавьте JMX Exporter для экспорта метрик Kafka в Prometheus:
# Добавьте в docker-compose.yml
kafka:
environment:
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
ports:
- "9101:9101"
jmx-exporter:
image: sscaling/jmx-prometheus-exporter
ports:
- "5556:5556"
environment:
SERVICE_PORT: 5556
volumes:
- ./jmx-exporter-config.yml:/etc/jmx-exporter/config.yml
command:
- "5556"
- "/etc/jmx-exporter/config.yml"Grafana dashboard: Используйте готовый Kafka Overview dashboard.
Структурное логирование
import structlog
logger = structlog.get_logger()
async def handle_order_created(event: dict):
"""Обработчик с structured logging"""
logger.info(
"order.processing.started",
order_id=event["order_id"],
user_id=event["user_id"],
total_amount=event["total_amount"],
)
try:
await process_order(event)
logger.info(
"order.processing.completed",
order_id=event["order_id"],
duration_ms=123,
)
except Exception as e:
logger.error(
"order.processing.failed",
order_id=event["order_id"],
error=str(e),
exc_info=True,
)
raiseСтруктурные логи легко парсить в ELK/Loki для аналитики.
Production Checklist
Перед деплоем
- : минимум 3 брокера, replication factor >= 2
- : настройте
log.retention.hours(default: 168h = 7 дней) - Партиции: рассчитайте количество (2-3x количество консьюмеров)
- Acks:
acks=allдля критичных данных - Idempotence: включите
enable.idempotence=True - : используйте
gzipилиsnappy - Monitoring: настройте алерты на lag и error rate
- DLQ: реализуйте Dead Letter Queue
- Graceful shutdown: обрабатывайте SIGTERM
- Тесты: integration тесты с Testcontainers
Безопасность
# SSL/TLS encryption
kafka:
environment:
KAFKA_SECURITY_PROTOCOL: SSL
KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: changeit
# SASL Authentication
KAFKA_SASL_MECHANISM: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAINCapacity planning
Формула throughput:
Max Throughput = (Partitions × Consumer Processing Rate)
Пример: 10 партиций, каждый consumer обрабатывает 100 msg/s → max 1000 msg/s.
Формула storage:
Storage = (Events per day × Avg event size × Retention days)
Пример: 10M событий/день × 1KB × 7 дней = 70GB.
Итоги
Вы научились:
- Понимать архитектуру Kafka — топики, партиции, consumer groups
- Интегрировать с FastAPI — async producer и consumer
- Обрабатывать события — идемпотентность, retry, DLQ
- Избегать подводных камней — порядок, rebalancing, backpressure
- Тестировать — unit, integration, e2e
- Мониторить — метрики, логи, алерты
Следующие шаги:
- Изучите Kafka Streams для stream processing
- Попробуйте Schema Registry (Avro) для версионирования схем
- Изучите KSQL для SQL-запросов к событиям
- Реализуйте Saga pattern для распределённых транзакций
- Настройте multi-region replication для disaster recovery
Полезные ссылки:
- Apache Kafka Documentation
- aiokafka Documentation
- Confluent Python Client
- Designing Event-Driven Systems — книга от Confluent
Kafka — мощный инструмент для построения масштабируемых distributed систем. Начните с простого use case (async notifications), освойте паттерны, и постепенно переходите к event sourcing и .
Kafka превращает ваш монолит в event-driven систему за неделю, но потребует месяцы на освоение всех нюансов. Начинайте с одного топика и одного consumer — усложняйте постепенно.
Глоссарий
Ключевые термины, используемые в статье:
- Идемпотентность
- Свойство операции, которую можно выполнить многократно без изменения результата после первого применения. Например: установка значения x=5 идемпотентна, а x=x+5 — нет. В Kafka идемпотентность защищает от дубликатов при retry.
- Acknowledgement (ACK)
- Подтверждение получения сообщения. Producer может требовать ACK от лидера партиции (acks=1), от всех реплик (acks=all) или не ждать вообще (acks=0).
- Apache Avro
- Бинарный формат сериализации данных от Apache (назван в честь британского производителя самолётов Avro). В отличие от JSON, Avro компактнее (меньше размер), быстрее (бинарный формат) и поддерживает эволюцию схемы. Схема хранится отдельно в Schema Registry, в событиях передаётся только ID схемы + данные.
- Broker
- Сервер Kafka, который хранит данные и обрабатывает запросы от producer и consumer. Кластер Kafka состоит из нескольких брокеров для отказоустойчивости и масштабирования.
- Commit
- Операция сохранения offset в Kafka, которая помечает событие как обработанное. Commit можно делать автоматически (auto.commit) или вручную после успешной обработки для гарантии at-least-once.
- Consumer
- Приложение (или его часть), которое читает события из Kafka. Consumer подписывается на топики, читает события с определённых партиций и коммитит offset после обработки.
- Consumer Group
- Логическая группа консьюмеров, которые делят между собой партиции одного топика. Kafka автоматически распределяет партиции между консьюмерами группы. При добавлении/удалении консьюмера происходит rebalancing.
- CQRS
- Command Query Responsibility Segregation — паттерн разделения моделей для записи (Command) и чтения (Query). Команды изменяют состояние через события, а запросы читают из оптимизированных read-моделей. Kafka идеально подходит для CQRS: события в топиках = команды, consumer строят read-модели в БД/Redis.
- Dead Letter Queue (DLQ)
- Специальный топик для событий, которые не удалось обработать после всех retry попыток. Позволяет не блокировать обработку здоровых событий и анализировать проблемы отдельно.
- Event Sourcing
- Паттерн хранения данных, где вместо текущего состояния сохраняется последовательность событий. Состояние восстанавливается проигрыванием всех событий. Даёт полную историю изменений и возможность replay.
- Lag
- Отставание consumer от последнего события в партиции. Измеряется в количестве необработанных событий. Высокий lag (тысячи событий) означает, что consumer не успевает обрабатывать поток данных.
- Latency
- Задержка между отправкой запроса и получением ответа. Измеряется в миллисекундах (ms). В высоконагруженных системах важны процентили: p50, p95, p99.
- Offset
- Уникальный порядковый номер события внутри партиции. Начинается с 0 и увеличивается при каждом новом событии. Consumer хранит свой offset чтобы знать, до какого места он прочитал топик.
- Persistence
- Сохранение данных на диск для долговременного хранения. В Kafka события записываются на диск брокера и могут храниться днями/неделями, в отличие от in-memory очередей.
- Producer
- Приложение (или его часть), которое отправляет события в Kafka. Producer выбирает топик и опционально ключ партиции, затем сериализует данные и отправляет брокеру.
- Rebalancing
- Процесс перераспределения партиций между консьюмерами в Consumer Group. Происходит при добавлении/удалении консьюмера или изменении количества партиций. Во время rebalancing обработка событий приостанавливается.
- Replay
- Возможность перечитать события из прошлого, 'перемотав' offset назад. Полезно для отладки, восстановления данных или создания новых проекций из исторических событий.
- Replication Factor
- Количество копий каждой партиции на разных брокерах. При replication factor=3 данные хранятся на трёх брокерах. Защищает от потери данных при падении брокера. Минимум 2 для production, рекомендуется 3.
- Retention
- Время хранения событий в Kafka. По умолчанию 7 дней (168 часов). Старые события автоматически удаляются. Можно настроить по времени (log.retention.hours) или размеру (log.retention.bytes).
- Throughput
- Пропускная способность системы — количество событий, обрабатываемых в единицу времени. Измеряется в RPS (requests per second) или events/sec. В Kafka зависит от количества партиций и производительности consumer.
- Under-replicated partitions
- Партиции, у которых не все реплики синхронизированы. Означает проблемы с брокерами или сетью. Критическая метрика для мониторинга — высокое значение грозит потерей данных при падении лидера.
- ZooKeeper / KRaft
- Система координации кластера Kafka. ZooKeeper отвечал за выбор лидера, хранение метаданных и координацию брокеров. С Kafka 3.0+ используется KRaft — встроенный механизм без внешних зависимостей.



