Queue-Based Tracing: Kafka, RabbitMQ и асинхронная обработка
Queue-Based Tracing: Kafka, RabbitMQ и асинхронная обработка
Цель урока
Научиться трассировать асинхронные системы с message queues, где HTTP-заголовки не работают. Вы узнаете:
- Почему traces "ломаются" при использовании очередей
- Как вручную передавать trace context через Kafka headers
- Как настроить трассировку для RabbitMQ с correlation_id
- Debugging Dead Letter Queues через traces
- Best practices для async processing patterns
Готовые примеры кода
Полные рабочие примеры с Kafka и RabbitMQ доступны в репозитории курса.
Готовые примеры: Kafka + RabbitMQ Tracing
Kafka примеры: 07-kafka-tracing/ — Order Service → Kafka → Payment Service → Kafka → Notification Service
RabbitMQ примеры: 07-rabbitmq-tracing/ — Producer → RabbitMQ → Consumer с correlation_id
Быстрый старт (Kafka):
cd 07-kafka-tracing
docker-compose up -d
# Подождать ~30 секунд пока Kafka запустится
sleep 30
# Создать заказ
curl -X POST http://localhost:3001/orders \
-H "Content-Type: application/json" \
-d '{"userId": 42, "items": ["book", "pen"], "total": 25.99}'
# Посмотреть trace в Jaeger
open http://localhost:16686Быстрый старт (RabbitMQ):
cd 07-rabbitmq-tracing
docker-compose up -d
# Отправить сообщение
curl -X POST http://localhost:3002/messages \
-H "Content-Type: application/json" \
-d '{"type": "email", "to": "user@example.com", "body": "Hello!"}'
# RabbitMQ Management UI
open http://localhost:15672 # guest/guest
# Jaeger UI
open http://localhost:16686Подробные инструкции: README.md
Проблема: Broken Traces в асинхронных системах
В синхронных HTTP вызовах trace context передается автоматически через заголовки traceparent:
Но в асинхронных системах с очередями:
Проблема: Consumer создает новый trace вместо продолжения существующего!
Результат в Jaeger:
Вместо одного trace на 4.8s видим два orphan traces по 50ms и 200ms. Невозможно понять полную картину!
Решение: Manual Context Propagation
Нужно вручную передать trace context в message headers:
Результат в Jaeger - единый trace:
Теперь видно всю цепочку и bottleneck (DB insert 3.8s)!
Архитектура для практики
Создадим систему заказов с тремя сервисами и Kafka:
Что увидим в Jaeger:
Часть 1: Kafka Tracing
Docker Compose Setup
Создайте файл docker-compose.yml:
version: "3.8"
services:
# Jaeger для трейсов
jaeger:
image: jaegertracing/all-in-one:1.53
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
- COLLECTOR_OTLP_ENABLED=true
# Zookeeper (нужен для Kafka)
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"Запуск:
docker-compose up -d
# Проверка
docker-compose ps # Все сервисы должны быть UpKafka Producer: Order Service (Node.js)
Структура проекта:
order-service/
├── package.json
├── tracing.js
└── producer.jsФайл package.json:
{
"name": "order-service",
"version": "1.0.0",
"scripts": {
"start": "node producer.js"
},
"dependencies": {
"express": "^4.18.2",
"kafkajs": "^2.2.4",
"@opentelemetry/sdk-node": "^0.46.0",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/auto-instrumentations-node": "^0.40.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.46.0",
"@opentelemetry/semantic-conventions": "^1.18.1"
}
}Файл tracing.js:
const { NodeSDK } = require("@opentelemetry/sdk-node");
const {
OTLPTraceExporter,
} = require("@opentelemetry/exporter-trace-otlp-http");
const {
getNodeAutoInstrumentations,
} = require("@opentelemetry/auto-instrumentations-node");
const sdk = new NodeSDK({
serviceName: "order-service",
traceExporter: new OTLPTraceExporter({
url: "http://localhost:4318/v1/traces",
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
process.on("SIGTERM", () => {
sdk.shutdown().finally(() => process.exit(0));
});Файл producer.js:
require("./tracing"); // ВАЖНО: Инициализация трейсинга ПЕРВОЙ
const express = require("express");
const { Kafka } = require("kafkajs");
const {
trace,
context,
propagation,
SpanStatusCode,
} = require("@opentelemetry/api");
const app = express();
app.use(express.json());
const kafka = new Kafka({
clientId: "order-service",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
const tracer = trace.getTracer("order-service");
// Подключение к Kafka
producer.connect().then(() => {
console.log("✅ Kafka producer connected");
});
app.post("/orders", async (req, res) => {
// Создаем span для всей операции
return tracer.startActiveSpan("POST /orders", async (span) => {
try {
const order = {
id: Math.random().toString(36).substring(7),
product: req.body.product || "Unknown",
amount: req.body.amount || 100,
timestamp: new Date().toISOString(),
};
span.setAttribute("order.id", order.id);
span.setAttribute("order.product", order.product);
span.setAttribute("order.amount", order.amount);
// ⭐ КРИТИЧЕСКОЕ: Inject trace context в Kafka headers
const headers = {};
propagation.inject(context.active(), headers);
console.log("📤 Injected trace context:", headers);
// Отправка в Kafka с trace context
await producer.send({
topic: "orders.created",
messages: [
{
key: order.id,
value: JSON.stringify(order),
headers: headers, // ⭐ Trace context здесь!
},
],
});
span.addEvent("order.published", {
"messaging.destination": "orders.created",
"messaging.message_id": order.id,
});
span.setStatus({ code: SpanStatusCode.OK });
res.json({ success: true, order });
} catch (error) {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
res.status(500).json({ error: error.message });
} finally {
span.end();
}
});
});
const PORT = 3001;
app.listen(PORT, () => {
console.log(`🚀 Order Service running on port ${PORT}`);
});Ключевые моменты:
propagation.inject(context.active(), headers)- вставляетtraceparentв headers- Kafka message headers - передаются как обычные key-value пары
- Span attributes - добавляем метаданные заказа для debugging
Kafka Consumer: Payment Service (Python)
Структура проекта:
payment-service/
├── requirements.txt
├── tracing.py
└── consumer.pyФайл requirements.txt:
kafka-python==2.0.2
opentelemetry-sdk==1.21.0
opentelemetry-exporter-otlp==1.21.0
opentelemetry-instrumentation==0.42b0
opentelemetry-semantic-conventions==0.42b0Файл tracing.py:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
# Resource с названием сервиса
resource = Resource.create({"service.name": "payment-service"})
# TracerProvider
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)Файл consumer.py:
import tracing # ВАЖНО: Инициализация ПЕРВОЙ
import json
import time
from kafka import KafkaConsumer
from opentelemetry import trace, context, propagation
from opentelemetry.trace import SpanKind, Status, StatusCode
tracer = trace.get_tracer("payment-service")
# Kafka Consumer
consumer = KafkaConsumer(
"orders.created",
bootstrap_servers=["localhost:9092"],
group_id="payment-service-group",
auto_offset_reset="earliest",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
print("✅ Payment Service consumer started")
for message in consumer:
# ⭐ КРИТИЧЕСКОЕ: Extract trace context из Kafka headers
headers_dict = {k: v.decode("utf-8") if isinstance(v, bytes) else v
for k, v in message.headers}
print(f"📥 Extracted trace context: {headers_dict}")
# Восстанавливаем context из headers
ctx = propagation.extract(headers_dict)
# Создаем span КАК CHILD существующего trace
with tracer.start_as_current_span(
"process.payment",
context=ctx, # ⭐ Используем извлеченный context!
kind=SpanKind.CONSUMER,
) as span:
order = message.value
span.set_attribute("messaging.system", "kafka")
span.set_attribute("messaging.destination", "orders.created")
span.set_attribute("messaging.message_id", order["id"])
span.set_attribute("order.id", order["id"])
span.set_attribute("order.product", order["product"])
span.set_attribute("order.amount", order["amount"])
try:
# Имитация обработки платежа
print(f"💳 Processing payment for order {order['id']}...")
time.sleep(2) # Эмуляция работы
span.add_event("payment.processed", {
"payment.status": "success",
"payment.amount": order["amount"],
})
span.set_status(Status(StatusCode.OK))
print(f"✅ Payment processed for order {order['id']}")
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
print(f"❌ Payment failed: {e}")Ключевые моменты:
propagation.extract(headers_dict)- восстанавливает trace context из headerscontext=ctx- передаем восстановленный context в spanSpanKind.CONSUMER- помечаем span как consumer для визуализации- Span attributes - используем semantic conventions для messaging
Kafka Consumer: Notification Service (Go)
Структура проекта:
notification-service/
├── go.mod
├── go.sum
├── tracing.go
└── main.goФайл go.mod:
module notification-service
go 1.21
require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
)Файл tracing.go:
package main
import (
"context"
"log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)
func InitTracing() func() {
ctx := context.Background()
// OTLP Exporter
exporter, err := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint("localhost:4318"),
otlptracehttp.WithInsecure(),
)
if err != nil {
log.Fatal(err)
}
// Resource
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName("notification-service"),
),
)
if err != nil {
log.Fatal(err)
}
// TracerProvider
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
)
otel.SetTracerProvider(tp)
return func() {
if err := tp.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}
}Файл main.go:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
type Order struct {
ID string `json:"id"`
Product string `json:"product"`
Amount float64 `json:"amount"`
Timestamp string `json:"timestamp"`
}
func main() {
// Инициализация трейсинга
shutdown := InitTracing()
defer shutdown()
tracer := otel.Tracer("notification-service")
// Kafka Consumer
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "notification-service-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumer.Subscribe("payments.processed", nil)
log.Println("✅ Notification Service consumer started")
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v\n", err)
continue
}
// ⭐ КРИТИЧЕСКОЕ: Extract trace context из Kafka headers
headersMap := make(map[string]string)
for _, header := range msg.Headers {
headersMap[header.Key] = string(header.Value)
}
fmt.Printf("📥 Extracted trace context: %+v\n", headersMap)
// Восстанавливаем context
ctx := propagation.TraceContext{}.Extract(
context.Background(),
propagation.MapCarrier(headersMap),
)
// Создаем span КАК CHILD
ctx, span := tracer.Start(ctx, "send.notification",
trace.WithSpanKind(trace.SpanKindConsumer),
)
var order Order
if err := json.Unmarshal(msg.Value, &order); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
continue
}
span.SetAttributes(
attribute.String("messaging.system", "kafka"),
attribute.String("messaging.destination", string(msg.TopicPartition.Topic)),
attribute.String("order.id", order.ID),
attribute.String("order.product", order.Product),
)
// Имитация отправки email
fmt.Printf("📧 Sending notification for order %s...\n", order.ID)
time.Sleep(800 * time.Millisecond)
span.AddEvent("notification.sent", trace.WithAttributes(
attribute.String("notification.type", "email"),
attribute.String("order.id", order.ID),
))
span.SetStatus(codes.Ok, "notification sent")
span.End()
fmt.Printf("✅ Notification sent for order %s\n", order.ID)
}
}Ключевые моменты Go:
propagation.TraceContext{}.Extract()- восстанавливает W3C Trace Contexttrace.WithSpanKind(trace.SpanKindConsumer)- правильный тип span- Headers mapping - конвертируем Kafka headers в map для propagation
Часть 2: RabbitMQ Tracing
Docker Compose для RabbitMQ
Добавьте в docker-compose.yml:
# RabbitMQ
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: adminRabbitMQ Producer (Node.js)
const amqp = require("amqplib");
const { propagation, context } = require("@opentelemetry/api");
async function publishWithTracing(order) {
const connection = await amqp.connect("amqp://admin:admin@localhost:5672");
const channel = await connection.createChannel();
const queue = "tasks";
await channel.assertQueue(queue, { durable: true });
// ⭐ Inject trace context в RabbitMQ headers
const headers = {};
propagation.inject(context.active(), headers);
channel.sendToQueue(queue, Buffer.from(JSON.stringify(order)), {
persistent: true,
headers: headers, // ⭐ Trace context
});
console.log("📤 Published to RabbitMQ with trace context");
await channel.close();
await connection.close();
}RabbitMQ Consumer (Python)
import pika
from opentelemetry import propagation
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
def callback(ch, method, properties, body):
# ⭐ Extract trace context из headers
headers_dict = properties.headers or {}
ctx = propagation.extract(headers_dict)
with tracer.start_as_current_span(
"process.task",
context=ctx, # ⭐ Продолжаем trace
) as span:
# Process message
print(f"Processing: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()Тестирование полного Flow
1. Запустите все сервисы
# Terminal 1: Infrastructure
docker-compose up -d
# Terminal 2: Order Service (Node.js)
cd order-service
npm install
npm start
# Terminal 3: Payment Service (Python)
cd payment-service
pip install -r requirements.txt
python consumer.py
# Terminal 4: Notification Service (Go)
cd notification-service
go mod download
go run .2. Создайте заказ
curl -X POST http://localhost:3001/orders \
-H "Content-Type: application/json" \
-d '{
"product": "Laptop",
"amount": 1500
}'3. Откройте Jaeger UI
http://localhost:16686Что увидите:
Единый trace через все сервисы! 🎉
Практические задания
Задание 1: Найти bottleneck в async flow
- Запустите систему
- Создайте 10 заказов подряд
- В Jaeger найдите самый медленный trace
- Определите bottleneck (payment processing или notification?)
Подсказка: Используйте Jaeger UI → Service & Operation → Compare traces
Задание 2: Dead Letter Queue Debugging
Добавьте DLQ для failed payments:
// В payment service: если amount > 10000, throw error
if (order.amount > 10000) {
throw new Error("Amount too high!");
}Вопросы:
- Как выглядит failed trace в Jaeger?
- Можно ли проследить сообщение до DLQ?
Задание 3: Retry Storm Detection
Добавьте retry logic в consumer:
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
process_payment(order)
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
raise
time.sleep(1)Проверьте в Jaeger:
- Видны ли все retry attempts?
- Как отличить retry от нормальной обработки?
Common Pitfalls
1. Header Serialization Issues
❌ Проблема:
// Kafka headers ожидают Buffer/string
headers: {
traceparent: context.active();
} // WRONG!✅ Правильно:
const headers = {};
propagation.inject(context.active(), headers);
// headers теперь { traceparent: "00-abc-123-01" }2. Clock Skew между Producer и Consumer
Если часы producer и consumer не синхронизированы:
Trace выглядит сломанным! Consumer span раньше producer span!
Решение: Синхронизируйте время (NTP) на всех серверах.
3. Batch Processing Context Loss
При обработке batch messages:
# ❌ WRONG: Теряется context для каждого сообщения
messages = consumer.consume_batch(100)
for msg in messages:
process(msg) # Все spans под одним parent!
# ✅ RIGHT: Восстанавливаем context для каждого
for msg in messages:
ctx = propagation.extract(msg.headers)
with tracer.start_as_current_span("process", context=ctx):
process(msg)4. Trace Orphaning при Crashes
Если consumer упал до отправки span:
Producer → Kafka → [Consumer crashed] → Orphan span!Mitigation:
- Используйте
BatchSpanProcessor(по умолчанию) - Настройте
shutdowntimeout - Мониторьте orphan traces в Jaeger
Best Practices
✅ 1. Всегда передавайте Trace Context
// В КАЖДОМ producer
const headers = {};
propagation.inject(context.active(), headers);
await producer.send({ headers });✅ 2. Используйте Semantic Conventions
span.set_attribute("messaging.system", "kafka") # kafka, rabbitmq, sqs
span.set_attribute("messaging.destination", "orders.created") # topic/queue
span.set_attribute("messaging.message_id", msg.id)
span.set_attribute("messaging.operation", "publish") # publish/receiveПолный список: OpenTelemetry Semantic Conventions
✅ 3. Мониторьте Queue Lag
Добавьте метрики:
queue_lag = current_offset - consumer_offset
span.set_attribute("messaging.consumer_lag", queue_lag)Коррелируйте с traces для понимания delays.
✅ 4. Trace Sampling для High-Volume Queues
Для очередей с миллионами сообщений:
// Probability-based sampling
const shouldSample = Math.random() < 0.01; // 1%
if (shouldSample) {
// ... create span
}Или используйте Tail-based sampling в OTel Collector (урок 09).
✅ 5. DLQ Tracing Pattern
try:
process(message)
except Exception as e:
span.record_exception(e)
span.set_status(StatusCode.ERROR)
# Отправляем в DLQ С TRACE CONTEXT
dlq_headers = {}
propagation.inject(context.current(), dlq_headers)
send_to_dlq(message, headers=dlq_headers)Теперь можно проследить message от original queue до DLQ!
Что дальше
В следующем уроке рассмотрим gRPC трассировку - еще один протокол, где context propagation работает иначе чем HTTP.
Поздравляем! Вы овладели queue-based tracing - одной из самых сложных тем в distributed tracing. Теперь вы можете отлаживать async системы так же легко, как синхронные HTTP API.