Первая async-гонка: промокод применяется дважды
Этот урок объединяет знания из предыдущих трёх: test isolation (урок 0), race conditions (урок 1), async фикстуры (урок 2). Теперь ловим реальную async гонку в приложении.
Главный фокус — научиться воспроизводить и фиксить race conditions в async коде с помощью asyncio.Lock.
Production Issue #1247
Репорт заказчика:
"При массовом добавлении задач через API некоторые задачи пропадают из списка. В логах видим конфликты записи в кеш. Проблема воспроизводится нестабильно, примерно в 15% запусков при нагрузке."
Root cause: Race condition при конкурентной записи в файловый кеш без синхронизации.
Что такое flaky-тест (профессиональное определение)
Flaky-тест — тест, который при идентичных входных данных и коде демонстрирует недетерминированное поведение. Основные причины в реальных системах:
- Race conditions — конкурентный доступ к shared state без синхронизации
- Test pollution — утечка состояния между тестами
- Timing issues — недостаточные таймауты в асинхронном коде
- External dependencies — нестабильность сети, БД, очередей
- Resource leaks — незакрытые соединения, файловые дескрипторы
В этом уроке воспроизводим race condition (причина #1) — самую частую проблему в высоконагруженных системах.
Подготовка
Делаем низкий порог входа: сначала — только то, что нужно для воспроизведения гонки. Набор observability-инструментов подключаем позже и только если захотите.
Базовый старт (достаточно, чтобы увидеть гонку и написать фикс):
git clone https://github.com/potapov-me/pytest-from-zero-to-confidence.git
cd pytest-from-zero-to-confidence
git checkout fixed
pip install -e '.[test]' pytest-repeat
pytest -q # убеждаемся, что база зелёнаяОпционально для бонусных частей:
pip install pytest-xdist # параллельный запуск (-n)
pip install opentelemetry-api opentelemetry-sdk prometheus-client # метрики и трейсыЕсли цель — только понять race condition и научиться её фиксить, вторую группу пакетов ставить не нужно. К observability вернёмся в диагностике (Шаг 3) как к необязательному, но полезному бонусу.
Шаг 1. Воспроизводим production bug
Создайте tests/integration/test_concurrent_cache_race.py. Мы воспроизводим реальную гонку: конкурентная запись в файловый кеш без блокировок.
import concurrent.futures
import json
import time
import pytest
from pathlib import Path
from src.app import TodoApp
class RacySharedFileCache:
"""
Кеш с РЕАЛЬНОЙ race condition: все записи в один общий файл.
Проблема: read-modify-write без блокировки приводит к потере данных
при конкурентном доступе.
"""
def __init__(self, storage_path):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.cache_file = self.storage_path / "cache.json"
# Инициализируем пустой кеш
if not self.cache_file.exists():
self.cache_file.write_text(json.dumps({}))
def set(self, key: str, value: dict) -> None:
"""
ОПАСНАЯ РЕАЛИЗАЦИЯ без синхронизации!
Race condition возникает здесь:
1. Thread A читает {"task_1": {...}}
2. Thread B читает {"task_1": {...}}
3. Thread A записывает {"task_1": {...}, "task_2": {...}}
4. Thread B записывает {"task_1": {...}, "task_3": {...}} <- перезаписывает task_2!
"""
# ШАГ 1: Читаем текущее состояние
current_data = json.loads(self.cache_file.read_text())
# КРИТИЧЕСКАЯ СЕКЦИЯ БЕЗ БЛОКИРОВКИ!
# Имитируем задержку для усиления race condition
# (в проде: сетевой latency, disk I/O, GC pause, context switch)
time.sleep(0.05)
# ШАГ 2: Модифицируем данные
current_data[key] = value
# ШАГ 3: Записываем обратно (может перезаписать чужие изменения!)
self.cache_file.write_text(json.dumps(current_data, indent=2))
def invalidate(self, key):
"""Удаление с той же проблемой"""
current_data = json.loads(self.cache_file.read_text())
time.sleep(0.01)
current_data.pop(key, None)
self.cache_file.write_text(json.dumps(current_data))
def count_keys(self) -> int:
"""Подсчёт записей для проверки потерь"""
data = json.loads(self.cache_file.read_text())
return len(data)
@pytest.fixture
def racy_app(tmp_path):
"""Приложение с race-prone кешем"""
cache = RacySharedFileCache(storage_path=tmp_path / "cache")
# Remote mock для логирования
class RemoteMock:
def sync_task(self, task):
pass
def log(self, event, **kwargs):
pass
app = TodoApp(cache=cache, remote=RemoteMock())
return app
@pytest.mark.xfail(reason="Race condition not fixed yet; remove after implementing locking")
def test_concurrent_writes_cause_data_loss(racy_app):
"""
Воспроизводим ISSUE #1247: при параллельных записях последний перезаписывает всё.
Сценарий:
1. 10 потоков одновременно читают cache.json (пустой или частично заполненный)
2. Каждый добавляет свою задачу в локальную копию словаря
3. Каждый записывает свою версию обратно
4. Результат: последний writer wins, остальные задачи ПОТЕРЯНЫ
"""
app = racy_app
errors = []
def add_task_wrapper(i):
try:
return app.add_task(f"Task {i}")
except Exception as e:
errors.append(e)
return None
# Запускаем 10 параллельных операций одновременно
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(add_task_wrapper, i) for i in range(10)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
# Ожидаем: 10 успешных операций без исключений
successful = [r for r in results if r is not None]
assert len(errors) == 0, f"Got {len(errors)} errors during concurrent writes: {errors}"
assert len(successful) == 10, f"Expected 10 successful writes, got {len(successful)}"
# КРИТИЧЕСКАЯ ПРОВЕРКА: все ли задачи попали в кеш?
cached_count = app.cache.count_keys()
assert cached_count == 10, (
f"Race condition detected: {10 - cached_count} tasks lost! "
f"Expected 10 cached tasks, got only {cached_count}. "
f"This is the classic read-modify-write race condition."
)Запуск:
pytest tests/integration/test_concurrent_cache_race.py --count=5 -vОжидаемый результат: Тест падает с ошибкой "Race condition detected: X tasks lost" (обычно 7-9 задач из 10 теряются, остаётся только 1-3).
Почему именно так работает гонка:
- Все 10 потоков одновременно читают
cache.json(пустой{}) - Каждый поток добавляет свою задачу в локальную копию:
{"task_X": {...}} - Все 10 потоков пытаются записать обратно, но последний writer wins
- Результат: в файле остаётся только 1-3 задачи вместо 10
Если тест всё равно проходит (rare case):
- Увеличьте задержку:
time.sleep(0.1)вместо0.05 - Увеличьте количество потоков:
max_workers=20иrange(20) - Запустите повторно:
--count=10вместо--count=5
В production эта задержка не нужна — race condition возникает естественно при высокой нагрузке из-за context switches и I/O latency.
Шаг 2. Быстрый фикс: минимальная блокировка
Начинаем с самого простого: защищаем read-modify-write одним threading.Lock внутри процесса. Это убирает гонку в dev/CI и в однопроцессных сервисах без сторонних зависимостей.
# src/cache.py
import json
import threading
from pathlib import Path
class ThreadSafeCacheWithLock:
"""
Минимальный фикс гонки: один лок вокруг read-modify-write.
Подходит для однопроцессного запуска (dev/CI, uvicorn --reload).
"""
def __init__(self, storage_path):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.cache_file = self.storage_path / "cache.json"
self._lock = threading.Lock()
if not self.cache_file.exists():
self.cache_file.write_text(json.dumps({}))
def set(self, key: str, value: dict) -> None:
with self._lock:
current_data = json.loads(self.cache_file.read_text())
current_data[key] = value
self.cache_file.write_text(json.dumps(current_data, indent=2))
def invalidate(self, key):
with self._lock:
current_data = json.loads(self.cache_file.read_text())
current_data.pop(key, None)
self.cache_file.write_text(json.dumps(current_data))
def count_keys(self) -> int:
with self._lock:
data = json.loads(self.cache_file.read_text())
return len(data)Когда этого достаточно: один процесс, нет gunicorn/uwsgi с несколькими воркерами, нужно быстро убрать гонку в тестах.
Чего это не решает: межпроцессные гонки и распределённые блокировки — см. продвинутые варианты в Шаге 4.
Шаг 3. Углубленная диагностика (по желанию)
Если хотите понять механику гонки глубже, разберите реальный сценарий без искусственных задержек и подсветите последовательность операций простыми логами. Этот шаг можно пропустить, если быстрого фикса достаточно.
Реальный сценарий без искусственного sleep
time.sleep здесь только усиливает гонку на локальной машине. В реальных системах задержки появляются сами: сетевые round-trip'ы между сервисами, истощённый connection pool, очередь Redis/DB, блокировки на уровне транзакций.
Пример боевой гонки: два пользователя одновременно завершают задачу и оба получают бонус.
async def complete_task(task_id, user_id):
task = await db.get_task(task_id) # SELECT
if not task.completed: # <- критическая секция
await award_bonus(user_id) # бонус может начислиться дважды
task.completed = True
await db.save_task(task) # UPDATEКак протестировать без искусственных sleep:
import asyncio
import pytest
@pytest.mark.asyncio
async def test_bonus_awarded_once(async_db, bonus_service):
task_id = await async_db.create_task({"text": "Pay salaries"})
async def complete(uid):
await complete_task(task_id, uid)
# Два конкурирующих запроса как от реальных пользователей
await asyncio.gather(complete("u1"), complete("u2"))
bonuses = await bonus_service.count_awards(task_id)
assert bonuses == 1, "Бонус выдан больше одного раза"
task = await async_db.get_task(task_id)
assert task.completed is TrueЧтобы гонка сработала стабильнее:
- запустите тест с
--count=20и/или-n 4(нуженpytest-xdist); - используйте БД с слабой изоляцией (SQLite WAL или Postgres READ COMMITTED);
- уберите уникальные ограничения на уровне схемы, если хотите поймать проблему именно тестом.
Дальнейший фикс будет тем же: защищённая критическая секция (транзакция + SELECT ... FOR UPDATE, redis-lock, advisory lock) и проверка, что бонус выдан ровно один раз.
Быстрая диагностика без observability: debug-лог
Если хочется увидеть гонку «как есть», добавьте простые логи — этого достаточно, чтобы понять последовательность операций и не тащить продовый стек мониторинга.
import json
import threading
def set(self, key, value):
print(f"[{threading.current_thread().name}] READING cache")
current_data = json.loads(self.cache_file.read_text())
print(f"[{threading.current_thread().name}] MODIFYING cache")
current_data[key] = value
print(f"[{threading.current_thread().name}] WRITING cache")
self.cache_file.write_text(json.dumps(current_data))Запустите тест с --count=5 и посмотрите в консоль: увидите перемешанные READING/MODIFYING/WRITING от разных потоков — это и есть гонка. Этого минимума достаточно, чтобы понять проблему и переходить к фиксам.
Observability (бонус, опционально)
Эта часть опциональна: сначала воспроизведите гонку и попытайтесь её исправить без доп-инструментов. Если ваша цель — только разобраться с race condition, этот шаг можно пропустить. Если хотите прокачать наблюдаемость, вернитесь сюда — понадобятся пакеты из «Опционально для бонусных частей».
Добавляем observability в src/app.py:
# src/app.py
import uuid
from opentelemetry import trace
from prometheus_client import Counter, Histogram
# Метрики
cache_operations = Counter(
'cache_operations_total',
'Total cache operations',
['operation', 'status']
)
cache_operation_duration = Histogram(
'cache_operation_duration_seconds',
'Cache operation duration',
['operation']
)
tracer = trace.get_tracer(__name__)
class TodoApp:
def __init__(self, cache, remote):
self.cache = cache
self.remote = remote
self.tasks = {}
def add_task(self, text: str) -> str:
"""Добавить задачу с observability"""
with tracer.start_as_current_span("add_task") as span:
task_id = str(uuid.uuid4())
span.set_attribute("task.id", task_id)
span.set_attribute("task.text_length", len(text))
task = {"id": task_id, "text": text.strip(), "done": False}
if not task["text"]:
raise ValueError("Task text is empty")
# Сохраняем в memory storage
self.tasks[task_id] = task
# Пытаемся записать в кеш с observability
try:
with cache_operation_duration.labels(operation='set').time():
self.cache.set(f"task_{task_id}", task)
cache_operations.labels(operation='set', status='success').inc()
span.set_attribute("cache.written", True)
except Exception as exc:
cache_operations.labels(operation='set', status='error').inc()
span.set_attribute("cache.error", True)
span.record_exception(exc)
# ВАЖНО: не падаем, продолжаем работу без кеша
self.remote.log(
"cache_write_failed",
task_id=task_id,
error=str(exc),
error_type=type(exc).__name__
)
# Синхронизация с remote (уже есть в оригинале)
try:
self.remote.sync_task(task)
except Exception:
pass
return task_idКак увидеть метрики и трейсы в действии
Добавьте в тест экспорт метрик и настройку трейсинга:
# tests/integration/test_concurrent_cache_race.py
import concurrent.futures
import json
import time
import pytest
from pathlib import Path
from src.app import TodoApp
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from prometheus_client import REGISTRY
# Настройка OpenTelemetry для вывода трейсов в консоль
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
class RacySharedFileCache:
# ... (код из предыдущего примера)
def test_concurrent_writes_with_observability(racy_app):
"""Тест с выводом метрик и трейсов"""
app = racy_app
# Запускаем тест
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(app.add_task, f"Task {i}") for i in range(10)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
# ВЫВОДИМ МЕТРИКИ В КОНСОЛЬ
print("\n=== PROMETHEUS METRICS ===")
for metric in REGISTRY.collect():
if metric.name.startswith('cache_'):
print(f"\n{metric.name}:")
for sample in metric.samples:
print(f" {sample.name}{sample.labels} = {sample.value}")
# Проверяем потери
cached_count = app.cache.count_keys()
print(f"\n=== RACE CONDITION RESULT ===")
print(f"Expected: 10 tasks")
print(f"Got: {cached_count} tasks")
print(f"Lost: {10 - cached_count} tasks")Запуск с выводом метрик:
pytest tests/integration/test_concurrent_cache_race.py -v -sЧто вы увидите в консоли:
=== OPENTELEMETRY TRACES ===
{
"name": "add_task",
"context": {...},
"attributes": {
"task.id": "abc-123",
"task.text_length": 7,
"cache.written": true
}
}
=== PROMETHEUS METRICS ===
cache_operations_total:
cache_operations_total{operation="set",status="success"} = 3.0
cache_operations_total{operation="set",status="error"} = 0.0
cache_operation_duration_seconds_count:
cache_operation_duration_seconds_count{operation="set"} = 3.0
cache_operation_duration_seconds_sum:
cache_operation_duration_seconds_sum{operation="set"} = 0.156
=== RACE CONDITION RESULT ===
Expected: 10 tasks
Got: 3 tasks # ← Race condition!
Lost: 7 tasks # ← Вот откуда видно проблемуProduction-ready мониторинг (бонус)
В production метрики отправляются в Prometheus, трейсы — в Jaeger/Tempo:
# conftest.py (для production setup)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from prometheus_client import start_http_server
@pytest.fixture(scope="session", autouse=True)
def setup_observability():
# Prometheus metrics server
start_http_server(8000) # http://localhost:8000/metrics
# OTLP exporter для Jaeger
otlp_exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)Теперь при запуске тестов:
- Metrics:
curl http://localhost:8000/metrics— видите Prometheus metrics - Traces: Открываете Jaeger UI (
http://localhost:16686) — видите распределённые трейсы - Logs: В консоли pytest видите structured logs
Шаг 4. Продвинутые решения (production-ready)
Вариант 1: File-based locking (простой, надёжный)
# src/cache.py
import fcntl
import json
from pathlib import Path
class CacheError(Exception):
pass
class ThreadSafeSharedFileCache:
"""
Кеш с file locking для защиты от race condition.
Все данные в одном файле, защищённом fcntl блокировкой.
"""
def __init__(self, storage_path):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.cache_file = self.storage_path / "cache.json"
self.lock_file = self.storage_path / ".cache.lock"
# Инициализируем пустой кеш
if not self.cache_file.exists():
self.cache_file.write_text(json.dumps({}))
def set(self, key: str, value: dict) -> None:
"""Thread-safe и process-safe запись с file locking"""
with open(self.lock_file, 'w') as lock_fd:
try:
# Эксклюзивная блокировка (работает между процессами!)
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
# КРИТИЧЕСКАЯ СЕКЦИЯ ЗАЩИЩЕНА
# Read-modify-write атомарно
current_data = json.loads(self.cache_file.read_text())
current_data[key] = value
self.cache_file.write_text(json.dumps(current_data, indent=2))
except Exception as e:
raise CacheError(f"Failed to set cache: {e}")
finally:
# Освобождаем блокировку
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
def invalidate(self, key):
"""Thread-safe удаление"""
with open(self.lock_file, 'w') as lock_fd:
try:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
current_data = json.loads(self.cache_file.read_text())
current_data.pop(key, None)
self.cache_file.write_text(json.dumps(current_data))
except Exception as e:
raise CacheError(f"Failed to invalidate cache: {e}")
finally:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
def count_keys(self) -> int:
"""Подсчёт записей (для тестов)"""
data = json.loads(self.cache_file.read_text())
return len(data)Плюсы: Работает между процессами (multi-process safe), надёжно Минусы: Медленнее threading.Lock из-за syscall overhead
Вариант 2: Threading Lock (см. быстрый фикс)
Минимальный внутрипроцессный вариант уже показан в Шаге 2. Используйте его, если у вас один процесс и важны низкие накладные расходы. Для multi-process и распределённых сценариев смотрите Вариант 1 и 3.
Вариант 3: Circuit Breaker для graceful degradation
# src/cache.py
import time
from typing import Optional
class CacheWithCircuitBreaker:
"""
Оборачиваем кеш в circuit breaker для graceful degradation.
Если кеш падает слишком часто — отключаем его автоматически.
"""
def __init__(self, cache, remote, failure_threshold: int = 5, timeout: int = 60):
self.cache = cache
self.remote = remote
self.failure_threshold = failure_threshold
self.timeout = timeout # Время до попытки восстановления (секунды)
self.failure_count = 0
self.state = "closed" # closed (работает) | open (отключен) | half-open (проверка)
self.last_failure_time = None
def set(self, key: str, value: dict) -> None:
"""Thread-safe запись с circuit breaker"""
if self.state == "open":
# Проверяем, не пора ли попробовать восстановиться
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
# Circuit open — пропускаем кеш полностью
self.remote.log("cache_circuit_open", key=key)
return
try:
self.cache.set(key, value)
self._on_success()
except Exception as exc:
self._on_failure()
if self.state == "open":
# Circuit открылся — логируем для алертинга
self.remote.log(
"cache_circuit_opened",
failure_count=self.failure_count,
threshold=self.failure_threshold
)
# НЕ пробрасываем исключение — graceful degradation
self.remote.log("cache_write_failed", key=key, error=str(exc))
def invalidate(self, key):
"""Thread-safe удаление с circuit breaker"""
if self.state == "open":
return
try:
self.cache.invalidate(key)
self._on_success()
except Exception as exc:
self._on_failure()
def _on_success(self):
"""Успешная операция — сбрасываем счётчик"""
self.failure_count = 0
if self.state == "half-open":
self.state = "closed"
self.remote.log("cache_circuit_closed")
def _on_failure(self):
"""Неудачная операция — увеличиваем счётчик"""
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = "open"
self.last_failure_time = time.time()Production best practice: Комбинируйте Вариант 1 (file locking) + Вариант 3 (circuit breaker):
# Итоговая производственная реализация
cache_with_locking = ThreadSafeSharedFileCache(storage_path=Path("/var/cache/app"))
cache_with_protection = CacheWithCircuitBreaker(
cache=cache_with_locking,
remote=remote_logger,
failure_threshold=5
)
app = TodoApp(cache=cache_with_protection, remote=remote_logger)Шаг 5. Проверяем фикс
Выберите один из вариантов (рекомендуем #1 или #2) и обновите тест:
import concurrent.futures
import pytest
from src.app import TodoApp
from src.cache import ThreadSafeSharedFileCache # или ThreadSafeCacheWithLock
class RemoteMock:
def sync_task(self, task):
pass
def log(self, event, **kwargs):
pass
def test_concurrent_writes_are_safe_with_locking(tmp_path):
"""
После фикса: конкурентные записи безопасны благодаря locking механизму
"""
# Используем thread-safe кеш
cache = ThreadSafeSharedFileCache(storage_path=tmp_path / "cache")
app = TodoApp(cache=cache, remote=RemoteMock())
def add_task_wrapper(i):
return app.add_task(f"Task {i}")
# Запускаем 20 параллельных операций (увеличиваем нагрузку)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(add_task_wrapper, i) for i in range(20)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
# Все операции должны завершиться успешно
assert len(results) == 20, f"Expected 20 tasks, got {len(results)}"
# Все задачи должны быть в кеше (проверяем через count_keys)
cached_count = cache.count_keys()
assert cached_count == 20, f"Expected 20 cached tasks, got {cached_count}"Запуск с повышенной нагрузкой:
# Последовательные повторы
pytest tests/integration/test_concurrent_cache_race.py --count=20 -v
# Параллельные воркеры
pytest tests/integration/test_concurrent_cache_race.py -n 4 --dist loadfile --count=10Для флага
-nнуженpytest-xdist. Если не ставили — запускайте без-n/--distили установите плагин из блока «Опционально».
Ожидаемый результат: Все тесты зелёные, без потери данных.
Шаг 6. Load testing для уверенности
После фикса важно проверить поведение под реальной нагрузкой:
# tests/stress/test_cache_under_load.py
import concurrent.futures
import time
import pytest
from src.app import TodoApp
from src.cache import ThreadSafeSharedFileCache
class RemoteMock:
def sync_task(self, task):
pass
def log(self, event, **kwargs):
pass
@pytest.mark.stress
def test_cache_performance_under_load(tmp_path):
"""
Стресс-тест: измеряем latency и throughput под нагрузкой
"""
cache = ThreadSafeSharedFileCache(storage_path=tmp_path / "cache")
app = TodoApp(cache=cache, remote=RemoteMock())
latencies = []
def measure_latency(i: int):
start = time.time()
app.add_task(f"Load test task {i}")
return time.time() - start
# 100 параллельных операций
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(measure_latency, i) for i in range(100)]
latencies = [f.result() for f in concurrent.futures.as_completed(futures)]
# SLA requirements
sorted_latencies = sorted(latencies)
p50 = sorted_latencies[len(sorted_latencies) // 2]
p95 = sorted_latencies[int(len(sorted_latencies) * 0.95)]
p99 = sorted_latencies[int(len(sorted_latencies) * 0.99)]
print(f"\nLatency p50: {p50*1000:.2f}ms, p95: {p95*1000:.2f}ms, p99: {p99*1000:.2f}ms")
# Проверяем, что все 100 задач записаны
assert cache.count_keys() == 100, "Some tasks were lost during load test"
# Assertions based on production SLA
assert p50 < 0.1, f"p50 latency {p50*1000:.2f}ms exceeds 100ms SLA"
assert p95 < 0.2, f"p95 latency {p95*1000:.2f}ms exceeds 200ms SLA"
assert p99 < 0.5, f"p99 latency {p99*1000:.2f}ms exceeds 500ms SLA"Запуск стресс-тестов:
pytest tests/stress/ -v -m stressProduction checklist
После фикса race condition убедитесь, что:
- Observability: Metrics, traces, structured logs работают
- Monitoring: Dashboards показывают cache hit rate, error rate, latency percentiles
- Alerting: Alerts настроены на circuit breaker open, high error rate
- Load testing: Система выдерживает peak load (измерено в QA environment)
- Rollback plan: Canary deployment с автоматическим rollback по метрикам
- Documentation: Runbook для on-call engineer при cache degradation
Что вынести с урока
- Flaky-тесты — симптом race conditions, не "рандомный баг"
- Профессиональная диагностика требует observability: traces, metrics, logs
- Production-ready фикс включает: thread-safe locking, circuit breaker, graceful degradation
- Тестирование concurrency: используйте
ThreadPoolExecutor,pytest-xdist, stress tests - SLA-driven development: определяйте latency thresholds (p50/p95/p99) и проверяйте их в тестах
Главное отличие от "учебных примеров":
- ❌ Не используем
random.random()для имитации проблем - ✅ Воспроизводим реальные race conditions через конкурентный доступ
- ✅ Показываем production-ready решения с первой строки
- ✅ Интегрируем observability в основной код, а не в комментарии
В следующем уроке: Асинхронные тесты с pytest-asyncio и обнаружение deadlocks в конкурентном коде.