Перейти к содержимому
К программе курса
k6: нагрузочное тестирование как система
14 / 1782%

WebSockets и gRPC в k6

15 минут

Классификация

  • [Core]: HTTP/1.1, HTTP/2, WebSocket, gRPC.
  • [Ext]: Kafka, Redis, MSSQL и др. через xk6 (кастомный бинарь). Browser API оставляем как бонус/UX-smoke вне фокуса этого урока.

WebSocket [Core]

WebSocket — это постоянное bidirectional соединение для real-time коммуникации (чаты, live updates, gaming).

Базовый пример

import ws from "k6/ws";
import { check, sleep } from "k6";
import { Counter, Trend } from "k6/metrics";
 
// Кастомные метрики для WebSocket
const wsConnections = new Counter("ws_connections");
const wsMessages = new Counter("ws_messages_received");
const wsLatency = new Trend("ws_message_latency");
 
export const options = {
  scenarios: {
    websocket_load: {
      executor: "constant-vus",
      vus: 50,
      duration: "5m",
    },
  },
  thresholds: {
    ws_connections: ["count>0"],
    ws_messages_received: ["count>100"],
    ws_message_latency: ["p(95)<500"], // Latency сообщений < 500ms
  },
};
 
export default function () {
  const url = `${__ENV.WS_URL}/realtime`;
  const params = {
    headers: { Authorization: `Bearer ${__ENV.TOKEN}` },
  };
 
  const res = ws.connect(url, params, (socket) => {
    wsConnections.add(1);
 
    socket.on("open", () => {
      console.log("WebSocket connected");
      // Отправляем ping каждую секунду
      socket.setInterval(() => {
        const sentAt = Date.now();
        socket.send(JSON.stringify({ type: "ping", sentAt }));
      }, 1000);
    });
 
    socket.on("message", (msg) => {
      wsMessages.add(1);
      const data = JSON.parse(msg);
 
      // Измеряем latency round-trip
      if (data.type === "pong" && data.sentAt) {
        const latency = Date.now() - data.sentAt;
        wsLatency.add(latency);
      }
 
      check(msg, {
        "message is valid JSON": () => {
          try {
            JSON.parse(msg);
            return true;
          } catch {
            return false;
          }
        },
        "pong received": () => data.type === "pong",
      });
    });
 
    socket.on("error", (e) => {
      console.error("WebSocket error:", e);
    });
 
    socket.on("close", () => {
      console.log("WebSocket closed");
    });
 
    // Держим соединение открытым 30 секунд
    socket.setTimeout(() => {
      socket.close();
    }, 30000);
  });
 
  check(res, {
    "status is 101": (r) => r && r.status === 101,
  });
 
  sleep(1); // Пауза между итерациями
}

Метрики WebSocket

При тестировании WebSocket важны специфические метрики:

  • ws_connections — количество успешных подключений
  • ws_messages_received — количество полученных сообщений
  • ws_message_latency — время round-trip (ping → pong)
  • ws_session_duration — как долго держим соединение
  • ws_errors — ошибки соединения

WebSocket-тесты отличаются от HTTP: здесь важна не только latency первого подключения, но и стабильность соединения во времени. Мониторьте memory leak'и на сервере при длительных соединениях.

gRPC [Core]

gRPC используется для межсервисной коммуникации с высокой производительностью и строгими контрактами (protobuf).

Полный пример с метриками

import grpc from "k6/net/grpc";
import { check, sleep } from "k6";
import { Counter, Trend } from "k6/metrics";
 
// Кастомные метрики для gRPC
const grpcCalls = new Counter("grpc_calls_total");
const grpcErrors = new Counter("grpc_errors_total");
const grpcLatency = new Trend("grpc_request_duration");
 
const client = new grpc.Client();
 
export const options = {
  scenarios: {
    grpc_load: {
      executor: "constant-arrival-rate",
      rate: 100, // 100 RPS
      duration: "5m",
      preAllocatedVUs: 50,
      maxVUs: 200,
    },
  },
  thresholds: {
    grpc_calls_total: ["count>100"],
    grpc_errors_total: ["count<10"], // Меньше 10 ошибок за весь тест
    grpc_request_duration: ["p(95)<300", "p(99)<500"],
    "grpc_req_duration{method:AddItem}": ["p(95)<200"], // Per-method threshold
  },
};
 
export function setup() {
  // Загружаем proto-файл один раз в setup
  client.load(["./proto"], "shopstack.proto");
}
 
export default function () {
  // Подключаемся к gRPC серверу
  client.connect(`${__ENV.GRPC_HOST}:50051`, {
    plaintext: true, // Для dev/stage; в проде используйте TLS
    timeout: "5s",
  });
 
  const startTime = Date.now();
 
  try {
    // Unary call (простой запрос-ответ)
    const response = client.invoke("shopstack.CartService/AddItem", {
      userId: "user-123",
      sku: "PRODUCT-456",
      quantity: 1,
    });
 
    grpcCalls.add(1);
    const duration = Date.now() - startTime;
    grpcLatency.add(duration, { method: "AddItem" });
 
    check(response, {
      "status is OK": (r) => r && r.status === grpc.StatusOK,
      "has cart": (r) => r && r.message && r.message.cart !== undefined,
      "cart item added": (r) => r && r.message.cart.items.length > 0,
    });
 
    if (response.status !== grpc.StatusOK) {
      grpcErrors.add(1);
      console.error(
        `gRPC error: ${response.status} - ${response.error.message}`
      );
    }
  } catch (e) {
    grpcErrors.add(1);
    console.error("gRPC exception:", e);
  } finally {
    client.close();
  }
 
  sleep(1);
}

Метрики gRPC

k6 автоматически собирает метрики для gRPC:

  • grpc_req_duration — latency запроса (можно фильтровать по method)
  • grpc_streams — количество открытых streaming connections
  • grpc_streams_msgs_received — сообщений получено (для streaming)
  • grpc_streams_msgs_sent — сообщений отправлено (для streaming)

Дополнительно создавайте custom метрики:

  • Per-method latency: grpc_req_duration{method:GetUser}
  • Business metrics: "items added", "payments processed"

Streaming calls (Server/Client/Bidirectional)

// Server streaming example
const stream = client.invoke("shopstack.OrderService/TrackOrder", {
  orderId: "ORDER-123",
});
 
stream.on("data", (message) => {
  console.log("Order status:", message.status);
});
 
stream.on("end", () => {
  console.log("Stream ended");
});

gRPC streaming в k6 работает, но генерирует больше load на генератор. Для тестов с тысячами одновременных streams используйте несколько k6 инстансов или k6-operator в Kubernetes.

Расширения [Ext] через xk6

xk6 позволяет собрать кастомный бинарь k6 с дополнительными протоколами и функциями.

Установка и сборка

# 1. Установите xk6 builder
go install go.k6.io/xk6/cmd/xk6@latest
 
# 2. Соберите k6 с расширением (например, Kafka)
xk6 build --with github.com/grafana/xk6-kafka@latest
 
# В директории появится бинарь ./k6 с поддержкой Kafka
./k6 version
 
# 3. Можете собрать с несколькими расширениями
xk6 build \
  --with github.com/grafana/xk6-kafka@latest \
  --with github.com/grafana/xk6-redis@latest \
  --with github.com/grafana/xk6-sql@latest

Пример: тестирование Kafka

import { check } from "k6";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_AVRO,
} from "k6/x/kafka";
 
const writer = new Writer({
  brokers: ["localhost:9092"],
  topic: "test-topic",
});
 
const reader = new Reader({
  brokers: ["localhost:9092"],
  topic: "test-topic",
  groupID: "k6-test-group",
});
 
export const options = {
  scenarios: {
    kafka_producer: {
      executor: "constant-arrival-rate",
      rate: 100, // 100 сообщений/сек
      duration: "5m",
      preAllocatedVUs: 10,
      maxVUs: 50,
      exec: "produceMessages",
    },
    kafka_consumer: {
      executor: "constant-vus",
      vus: 5,
      duration: "5m",
      exec: "consumeMessages",
    },
  },
};
 
export function produceMessages() {
  const message = {
    key: `key-${Date.now()}`,
    value: JSON.stringify({
      userId: "user-123",
      action: "page_view",
      timestamp: Date.now(),
    }),
  };
 
  const error = writer.produce({ messages: [message] });
 
  check(error, {
    "message produced": (err) => err === undefined,
  });
}
 
export function consumeMessages() {
  const messages = reader.consume({ limit: 10 });
 
  check(messages, {
    "messages consumed": (msgs) => msgs.length > 0,
  });
 
  for (let message of messages) {
    const data = JSON.parse(message.value);
    check(data, {
      "has userId": (d) => d.userId !== undefined,
      "has action": (d) => d.action !== undefined,
    });
  }
}
 
export function teardown() {
  writer.close();
  reader.close();
}

Популярные xk6 расширения

РасширениеОписаниеРепозиторий
xk6-kafkaApache Kafka producer/consumergithub.com/grafana/xk6-kafka
xk6-redisRedis commandsgithub.com/grafana/xk6-redis
xk6-sqlSQL databases (Postgres, MySQL)github.com/grafana/xk6-sql
xk6-mqttMQTT protocol для IoTgithub.com/pmalhaire/xk6-mqtt
xk6-browserBrowser automation (headless Chrome)github.com/grafana/xk6-browser
xk6-output-prometheus-remotePrometheus remote writegithub.com/grafana/xk6-output-prometheus-remote

Когда применять xk6

Используйте xk6 если:

  • Нужен протокол, которого нет в k6 core (Kafka, Redis, MQTT)
  • Тестируете message queues или databases напрямую
  • Нужны специфичные output backends (InfluxDB, Prometheus, S3)

Не используйте xk6 если:

  • Протокол есть в core (HTTP, gRPC, WebSocket)
  • Можете протестировать через HTTP API вместо прямого протокола
  • Команда не готова поддерживать кастомные бинари

Управление версиями xk6-сборок

# Создайте Dockerfile для reproducible builds
FROM golang:1.21 as builder
RUN go install go.k6.io/xk6/cmd/xk6@latest
RUN xk6 build v0.48.0 \
    --with github.com/grafana/xk6-kafka@v0.21.0 \
    --with github.com/grafana/xk6-redis@v0.2.0
 
FROM alpine:3.18
COPY --from=builder /go/k6 /usr/local/bin/k6
ENTRYPOINT ["k6"]

Важно: При обновлении k6 нужно пересобирать xk6-бинарь. Храните версии расширений в Dockerfile/Makefile. Тестируйте совместимость перед production.

Помечайте в документации: что core, что experimental, что extension-only. Это экономит часы расследований «почему пример из интернета не работает».

Практика: что сделать до следующего урока

1.Протестируйте WebSocket (если есть в вашем проекте)

  • Найдите WebSocket endpoint в вашем приложении (или используйте wss://echo.websocket.org)
  • Напишите тест с ws.connect и отправкой ping/pong сообщений
  • Добавьте кастомные метрики: ws_connections, ws_messages_received, ws_message_latency
  • Запустите с 50 VU на 5 минут, проверьте стабильность соединений
  • В summary найдите: сколько сообщений получено, какая latency p95

2.Протестируйте gRPC (если есть в вашем проекте)

  • Возьмите .proto файл из вашего микросервиса
  • Напишите k6-тест с client.load() и client.invoke()
  • Добавьте per-method thresholds: grpc_req_duration{method:YourMethod}
  • Запустите с constant-arrival-rate: 100 RPS
  • Сравните latency gRPC vs HTTP REST для аналогичного endpoint

3.Соберите xk6 с расширением (опционально)

  • Установите Go (если нет): https://go.dev/dl/
  • Установите xk6: go install go.k6.io/xk6/cmd/xk6@latest
  • Соберите k6 с xk6-kafka или xk6-redis
  • Напишите простой тест для отправки сообщения в Kafka/запись в Redis
  • Убедитесь, что кастомный бинарь работает: ./k6 run test.js

✅ Чек-лист завершения урока

После этого урока вы должны уметь:

WebSocket тестирование:

  • Использовать ws.connect() для подключения к WebSocket
  • Обрабатывать события: open, message, error, close
  • Создавать кастомные метрики: ws_connections, ws_messages_received, ws_message_latency
  • Измерять round-trip latency (ping → pong)

gRPC тестирование:

  • Загружать .proto файлы через client.load()
  • Вызывать методы через client.invoke()
  • Создавать per-method thresholds: grpc_req_duration{method:AddItem}
  • Обрабатывать ошибки gRPC (grpc.StatusOK, error codes)

Расширения xk6:

  • Понимать, когда нужны расширения (Kafka, Redis, SQL)
  • Собирать кастомный бинарь k6 с xk6
  • Использовать xk6-kafka для тестирования очередей
  • Использовать xk6-redis для тестирования кэша

Практическое задание:

  • Если есть WebSocket в проекте — напишите тест с ping/pong
  • Если есть gRPC — напишите тест с client.invoke()
  • Опционально: соберите xk6 с расширением для вашего стека

Если чек-лист пройден — переходите к уроку 15: научимся масштабировать генераторы в Kubernetes.

WebSockets и gRPC в k6 — k6: нагрузочное тестирование как система — Potapov.me