Skip to main content
Back to course
Распределенная трассировка: от основ до production
8 / 1844%

Queue-Based Tracing: Kafka, RabbitMQ и асинхронная обработка

85 минут

Queue-Based Tracing: Kafka, RabbitMQ и асинхронная обработка

Цель урока

Научиться трассировать асинхронные системы с message queues, где HTTP-заголовки не работают. Вы узнаете:

  • Почему traces "ломаются" при использовании очередей
  • Как вручную передавать trace context через Kafka headers
  • Как настроить трассировку для RabbitMQ с correlation_id
  • Debugging Dead Letter Queues через traces
  • Best practices для async processing patterns

Готовые примеры кода

Полные рабочие примеры с Kafka и RabbitMQ доступны в репозитории курса.

Готовые примеры: Kafka + RabbitMQ Tracing

Kafka примеры: 07-kafka-tracing/ — Order Service → Kafka → Payment Service → Kafka → Notification Service

RabbitMQ примеры: 07-rabbitmq-tracing/ — Producer → RabbitMQ → Consumer с correlation_id

Быстрый старт (Kafka):

cd 07-kafka-tracing
docker-compose up -d
 
# Подождать ~30 секунд пока Kafka запустится
sleep 30
 
# Создать заказ
curl -X POST http://localhost:3001/orders \
  -H "Content-Type: application/json" \
  -d '{"userId": 42, "items": ["book", "pen"], "total": 25.99}'
 
# Посмотреть trace в Jaeger
open http://localhost:16686

Быстрый старт (RabbitMQ):

cd 07-rabbitmq-tracing
docker-compose up -d
 
# Отправить сообщение
curl -X POST http://localhost:3002/messages \
  -H "Content-Type: application/json" \
  -d '{"type": "email", "to": "user@example.com", "body": "Hello!"}'
 
# RabbitMQ Management UI
open http://localhost:15672  # guest/guest
 
# Jaeger UI
open http://localhost:16686

Подробные инструкции: README.md


Проблема: Broken Traces в асинхронных системах

В синхронных HTTP вызовах trace context передается автоматически через заголовки traceparent:

Но в асинхронных системах с очередями:

Проблема: Consumer создает новый trace вместо продолжения существующего!

Результат в Jaeger:

Вместо одного trace на 4.8s видим два orphan traces по 50ms и 200ms. Невозможно понять полную картину!


Решение: Manual Context Propagation

Нужно вручную передать trace context в message headers:

Результат в Jaeger - единый trace:

Теперь видно всю цепочку и bottleneck (DB insert 3.8s)!


Архитектура для практики

Создадим систему заказов с тремя сервисами и Kafka:

Что увидим в Jaeger:


Часть 1: Kafka Tracing

Docker Compose Setup

Создайте файл docker-compose.yml:

version: "3.8"
 
services:
  # Jaeger для трейсов
  jaeger:
    image: jaegertracing/all-in-one:1.53
    ports:
      - "16686:16686" # UI
      - "4317:4317" # OTLP gRPC
      - "4318:4318" # OTLP HTTP
    environment:
      - COLLECTOR_OTLP_ENABLED=true
 
  # Zookeeper (нужен для Kafka)
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
 
  # Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Запуск:

docker-compose up -d
 
# Проверка
docker-compose ps  # Все сервисы должны быть Up

Kafka Producer: Order Service (Node.js)

Структура проекта:

order-service/
├── package.json
├── tracing.js
└── producer.js

Файл package.json:

{
  "name": "order-service",
  "version": "1.0.0",
  "scripts": {
    "start": "node producer.js"
  },
  "dependencies": {
    "express": "^4.18.2",
    "kafkajs": "^2.2.4",
    "@opentelemetry/sdk-node": "^0.46.0",
    "@opentelemetry/api": "^1.7.0",
    "@opentelemetry/auto-instrumentations-node": "^0.40.0",
    "@opentelemetry/exporter-trace-otlp-http": "^0.46.0",
    "@opentelemetry/semantic-conventions": "^1.18.1"
  }
}

Файл tracing.js:

const { NodeSDK } = require("@opentelemetry/sdk-node");
const {
  OTLPTraceExporter,
} = require("@opentelemetry/exporter-trace-otlp-http");
const {
  getNodeAutoInstrumentations,
} = require("@opentelemetry/auto-instrumentations-node");
 
const sdk = new NodeSDK({
  serviceName: "order-service",
  traceExporter: new OTLPTraceExporter({
    url: "http://localhost:4318/v1/traces",
  }),
  instrumentations: [getNodeAutoInstrumentations()],
});
 
sdk.start();
 
process.on("SIGTERM", () => {
  sdk.shutdown().finally(() => process.exit(0));
});

Файл producer.js:

require("./tracing"); // ВАЖНО: Инициализация трейсинга ПЕРВОЙ
const express = require("express");
const { Kafka } = require("kafkajs");
const {
  trace,
  context,
  propagation,
  SpanStatusCode,
} = require("@opentelemetry/api");
 
const app = express();
app.use(express.json());
 
const kafka = new Kafka({
  clientId: "order-service",
  brokers: ["localhost:9092"],
});
 
const producer = kafka.producer();
const tracer = trace.getTracer("order-service");
 
// Подключение к Kafka
producer.connect().then(() => {
  console.log("✅ Kafka producer connected");
});
 
app.post("/orders", async (req, res) => {
  // Создаем span для всей операции
  return tracer.startActiveSpan("POST /orders", async (span) => {
    try {
      const order = {
        id: Math.random().toString(36).substring(7),
        product: req.body.product || "Unknown",
        amount: req.body.amount || 100,
        timestamp: new Date().toISOString(),
      };
 
      span.setAttribute("order.id", order.id);
      span.setAttribute("order.product", order.product);
      span.setAttribute("order.amount", order.amount);
 
      // ⭐ КРИТИЧЕСКОЕ: Inject trace context в Kafka headers
      const headers = {};
      propagation.inject(context.active(), headers);
 
      console.log("📤 Injected trace context:", headers);
 
      // Отправка в Kafka с trace context
      await producer.send({
        topic: "orders.created",
        messages: [
          {
            key: order.id,
            value: JSON.stringify(order),
            headers: headers, // ⭐ Trace context здесь!
          },
        ],
      });
 
      span.addEvent("order.published", {
        "messaging.destination": "orders.created",
        "messaging.message_id": order.id,
      });
 
      span.setStatus({ code: SpanStatusCode.OK });
      res.json({ success: true, order });
    } catch (error) {
      span.recordException(error);
      span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
      res.status(500).json({ error: error.message });
    } finally {
      span.end();
    }
  });
});
 
const PORT = 3001;
app.listen(PORT, () => {
  console.log(`🚀 Order Service running on port ${PORT}`);
});

Ключевые моменты:

  1. propagation.inject(context.active(), headers) - вставляет traceparent в headers
  2. Kafka message headers - передаются как обычные key-value пары
  3. Span attributes - добавляем метаданные заказа для debugging

Kafka Consumer: Payment Service (Python)

Структура проекта:

payment-service/
├── requirements.txt
├── tracing.py
└── consumer.py

Файл requirements.txt:

kafka-python==2.0.2
opentelemetry-sdk==1.21.0
opentelemetry-exporter-otlp==1.21.0
opentelemetry-instrumentation==0.42b0
opentelemetry-semantic-conventions==0.42b0

Файл tracing.py:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
 
# Resource с названием сервиса
resource = Resource.create({"service.name": "payment-service"})
 
# TracerProvider
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(
    OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
)
provider.add_span_processor(processor)
 
trace.set_tracer_provider(provider)

Файл consumer.py:

import tracing  # ВАЖНО: Инициализация ПЕРВОЙ
import json
import time
from kafka import KafkaConsumer
from opentelemetry import trace, context, propagation
from opentelemetry.trace import SpanKind, Status, StatusCode
 
tracer = trace.get_tracer("payment-service")
 
# Kafka Consumer
consumer = KafkaConsumer(
    "orders.created",
    bootstrap_servers=["localhost:9092"],
    group_id="payment-service-group",
    auto_offset_reset="earliest",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
 
print("✅ Payment Service consumer started")
 
for message in consumer:
    # ⭐ КРИТИЧЕСКОЕ: Extract trace context из Kafka headers
    headers_dict = {k: v.decode("utf-8") if isinstance(v, bytes) else v
                    for k, v in message.headers}
 
    print(f"📥 Extracted trace context: {headers_dict}")
 
    # Восстанавливаем context из headers
    ctx = propagation.extract(headers_dict)
 
    # Создаем span КАК CHILD существующего trace
    with tracer.start_as_current_span(
        "process.payment",
        context=ctx,  # ⭐ Используем извлеченный context!
        kind=SpanKind.CONSUMER,
    ) as span:
        order = message.value
 
        span.set_attribute("messaging.system", "kafka")
        span.set_attribute("messaging.destination", "orders.created")
        span.set_attribute("messaging.message_id", order["id"])
        span.set_attribute("order.id", order["id"])
        span.set_attribute("order.product", order["product"])
        span.set_attribute("order.amount", order["amount"])
 
        try:
            # Имитация обработки платежа
            print(f"💳 Processing payment for order {order['id']}...")
            time.sleep(2)  # Эмуляция работы
 
            span.add_event("payment.processed", {
                "payment.status": "success",
                "payment.amount": order["amount"],
            })
 
            span.set_status(Status(StatusCode.OK))
            print(f"✅ Payment processed for order {order['id']}")
 
        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            print(f"❌ Payment failed: {e}")

Ключевые моменты:

  1. propagation.extract(headers_dict) - восстанавливает trace context из headers
  2. context=ctx - передаем восстановленный context в span
  3. SpanKind.CONSUMER - помечаем span как consumer для визуализации
  4. Span attributes - используем semantic conventions для messaging

Kafka Consumer: Notification Service (Go)

Структура проекта:

notification-service/
├── go.mod
├── go.sum
├── tracing.go
└── main.go

Файл go.mod:

module notification-service
 
go 1.21
 
require (
    github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
    go.opentelemetry.io/otel v1.21.0
    go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0
    go.opentelemetry.io/otel/sdk v1.21.0
    go.opentelemetry.io/otel/trace v1.21.0
)

Файл tracing.go:

package main
 
import (
    "context"
    "log"
 
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)
 
func InitTracing() func() {
    ctx := context.Background()
 
    // OTLP Exporter
    exporter, err := otlptracehttp.New(ctx,
        otlptracehttp.WithEndpoint("localhost:4318"),
        otlptracehttp.WithInsecure(),
    )
    if err != nil {
        log.Fatal(err)
    }
 
    // Resource
    res, err := resource.New(ctx,
        resource.WithAttributes(
            semconv.ServiceName("notification-service"),
        ),
    )
    if err != nil {
        log.Fatal(err)
    }
 
    // TracerProvider
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(res),
    )
 
    otel.SetTracerProvider(tp)
 
    return func() {
        if err := tp.Shutdown(ctx); err != nil {
            log.Fatal(err)
        }
    }
}

Файл main.go:

package main
 
import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
 
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
)
 
type Order struct {
    ID        string  `json:"id"`
    Product   string  `json:"product"`
    Amount    float64 `json:"amount"`
    Timestamp string  `json:"timestamp"`
}
 
func main() {
    // Инициализация трейсинга
    shutdown := InitTracing()
    defer shutdown()
 
    tracer := otel.Tracer("notification-service")
 
    // Kafka Consumer
    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "notification-service-group",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()
 
    consumer.Subscribe("payments.processed", nil)
    log.Println("✅ Notification Service consumer started")
 
    for {
        msg, err := consumer.ReadMessage(-1)
        if err != nil {
            log.Printf("Consumer error: %v\n", err)
            continue
        }
 
        // ⭐ КРИТИЧЕСКОЕ: Extract trace context из Kafka headers
        headersMap := make(map[string]string)
        for _, header := range msg.Headers {
            headersMap[header.Key] = string(header.Value)
        }
 
        fmt.Printf("📥 Extracted trace context: %+v\n", headersMap)
 
        // Восстанавливаем context
        ctx := propagation.TraceContext{}.Extract(
            context.Background(),
            propagation.MapCarrier(headersMap),
        )
 
        // Создаем span КАК CHILD
        ctx, span := tracer.Start(ctx, "send.notification",
            trace.WithSpanKind(trace.SpanKindConsumer),
        )
 
        var order Order
        if err := json.Unmarshal(msg.Value, &order); err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
            span.End()
            continue
        }
 
        span.SetAttributes(
            attribute.String("messaging.system", "kafka"),
            attribute.String("messaging.destination", string(msg.TopicPartition.Topic)),
            attribute.String("order.id", order.ID),
            attribute.String("order.product", order.Product),
        )
 
        // Имитация отправки email
        fmt.Printf("📧 Sending notification for order %s...\n", order.ID)
        time.Sleep(800 * time.Millisecond)
 
        span.AddEvent("notification.sent", trace.WithAttributes(
            attribute.String("notification.type", "email"),
            attribute.String("order.id", order.ID),
        ))
 
        span.SetStatus(codes.Ok, "notification sent")
        span.End()
 
        fmt.Printf("✅ Notification sent for order %s\n", order.ID)
    }
}

Ключевые моменты Go:

  1. propagation.TraceContext{}.Extract() - восстанавливает W3C Trace Context
  2. trace.WithSpanKind(trace.SpanKindConsumer) - правильный тип span
  3. Headers mapping - конвертируем Kafka headers в map для propagation

Часть 2: RabbitMQ Tracing

Docker Compose для RabbitMQ

Добавьте в docker-compose.yml:

# RabbitMQ
rabbitmq:
  image: rabbitmq:3.12-management
  ports:
    - "5672:5672" # AMQP
    - "15672:15672" # Management UI
  environment:
    RABBITMQ_DEFAULT_USER: admin
    RABBITMQ_DEFAULT_PASS: admin

RabbitMQ Producer (Node.js)

const amqp = require("amqplib");
const { propagation, context } = require("@opentelemetry/api");
 
async function publishWithTracing(order) {
  const connection = await amqp.connect("amqp://admin:admin@localhost:5672");
  const channel = await connection.createChannel();
 
  const queue = "tasks";
  await channel.assertQueue(queue, { durable: true });
 
  // ⭐ Inject trace context в RabbitMQ headers
  const headers = {};
  propagation.inject(context.active(), headers);
 
  channel.sendToQueue(queue, Buffer.from(JSON.stringify(order)), {
    persistent: true,
    headers: headers, // ⭐ Trace context
  });
 
  console.log("📤 Published to RabbitMQ with trace context");
 
  await channel.close();
  await connection.close();
}

RabbitMQ Consumer (Python)

import pika
from opentelemetry import propagation
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
 
def callback(ch, method, properties, body):
    # ⭐ Extract trace context из headers
    headers_dict = properties.headers or {}
    ctx = propagation.extract(headers_dict)
 
    with tracer.start_as_current_span(
        "process.task",
        context=ctx,  # ⭐ Продолжаем trace
    ) as span:
        # Process message
        print(f"Processing: {body}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
 
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Тестирование полного Flow

1. Запустите все сервисы

# Terminal 1: Infrastructure
docker-compose up -d
 
# Terminal 2: Order Service (Node.js)
cd order-service
npm install
npm start
 
# Terminal 3: Payment Service (Python)
cd payment-service
pip install -r requirements.txt
python consumer.py
 
# Terminal 4: Notification Service (Go)
cd notification-service
go mod download
go run .

2. Создайте заказ

curl -X POST http://localhost:3001/orders \
  -H "Content-Type: application/json" \
  -d '{
    "product": "Laptop",
    "amount": 1500
  }'

3. Откройте Jaeger UI

http://localhost:16686

Что увидите:

Единый trace через все сервисы! 🎉


Практические задания

Задание 1: Найти bottleneck в async flow

  1. Запустите систему
  2. Создайте 10 заказов подряд
  3. В Jaeger найдите самый медленный trace
  4. Определите bottleneck (payment processing или notification?)

Подсказка: Используйте Jaeger UI → Service & Operation → Compare traces

Задание 2: Dead Letter Queue Debugging

Добавьте DLQ для failed payments:

// В payment service: если amount > 10000, throw error
if (order.amount > 10000) {
  throw new Error("Amount too high!");
}

Вопросы:

  • Как выглядит failed trace в Jaeger?
  • Можно ли проследить сообщение до DLQ?

Задание 3: Retry Storm Detection

Добавьте retry logic в consumer:

MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
    try:
        process_payment(order)
        break
    except Exception as e:
        if attempt == MAX_RETRIES - 1:
            raise
        time.sleep(1)

Проверьте в Jaeger:

  • Видны ли все retry attempts?
  • Как отличить retry от нормальной обработки?

Common Pitfalls

1. Header Serialization Issues

Проблема:

// Kafka headers ожидают Buffer/string
headers: {
  traceparent: context.active();
} // WRONG!

Правильно:

const headers = {};
propagation.inject(context.active(), headers);
// headers теперь { traceparent: "00-abc-123-01" }

2. Clock Skew между Producer и Consumer

Если часы producer и consumer не синхронизированы:

Trace выглядит сломанным! Consumer span раньше producer span!

Решение: Синхронизируйте время (NTP) на всех серверах.

3. Batch Processing Context Loss

При обработке batch messages:

# ❌ WRONG: Теряется context для каждого сообщения
messages = consumer.consume_batch(100)
for msg in messages:
    process(msg)  # Все spans под одним parent!
 
# ✅ RIGHT: Восстанавливаем context для каждого
for msg in messages:
    ctx = propagation.extract(msg.headers)
    with tracer.start_as_current_span("process", context=ctx):
        process(msg)

4. Trace Orphaning при Crashes

Если consumer упал до отправки span:

Producer → Kafka → [Consumer crashed] → Orphan span!

Mitigation:

  • Используйте BatchSpanProcessor (по умолчанию)
  • Настройте shutdown timeout
  • Мониторьте orphan traces в Jaeger

Best Practices

✅ 1. Всегда передавайте Trace Context

// В КАЖДОМ producer
const headers = {};
propagation.inject(context.active(), headers);
await producer.send({ headers });

✅ 2. Используйте Semantic Conventions

span.set_attribute("messaging.system", "kafka")  # kafka, rabbitmq, sqs
span.set_attribute("messaging.destination", "orders.created")  # topic/queue
span.set_attribute("messaging.message_id", msg.id)
span.set_attribute("messaging.operation", "publish")  # publish/receive

Полный список: OpenTelemetry Semantic Conventions

✅ 3. Мониторьте Queue Lag

Добавьте метрики:

queue_lag = current_offset - consumer_offset
span.set_attribute("messaging.consumer_lag", queue_lag)

Коррелируйте с traces для понимания delays.

✅ 4. Trace Sampling для High-Volume Queues

Для очередей с миллионами сообщений:

// Probability-based sampling
const shouldSample = Math.random() < 0.01; // 1%
if (shouldSample) {
  // ... create span
}

Или используйте Tail-based sampling в OTel Collector (урок 09).

✅ 5. DLQ Tracing Pattern

try:
    process(message)
except Exception as e:
    span.record_exception(e)
    span.set_status(StatusCode.ERROR)
    # Отправляем в DLQ С TRACE CONTEXT
    dlq_headers = {}
    propagation.inject(context.current(), dlq_headers)
    send_to_dlq(message, headers=dlq_headers)

Теперь можно проследить message от original queue до DLQ!


Что дальше

В следующем уроке рассмотрим gRPC трассировку - еще один протокол, где context propagation работает иначе чем HTTP.

Поздравляем! Вы овладели queue-based tracing - одной из самых сложных тем в distributed tracing. Теперь вы можете отлаживать async системы так же легко, как синхронные HTTP API.


Дополнительные материалы

Queue-Based Tracing: Kafka, RabbitMQ и асинхронная обработка — Распределенная трассировка: от основ до production — Potapov.me