Перейти к содержимому
К программе курса
Pytest: Борьба с Race Conditions в Async-коде
2 / 922%

Переход на async: async/await фикстуры

40 минут

Этот урок продолжает предыдущие два: мы уже знаем test isolation из урока 0 и race conditions из урока 1. Теперь переходим на async/await и настраиваем async фикстуры для PostgreSQL.

Главный фокус — test isolation: научиться писать async фикстуры, которые не создают гонок и утечек состояния между тестами.

Production Issue #1891

Репорт от CI:

"Integration tests randomly fail with asyncpg.UniqueViolationError: duplicate key value violates unique constraint. Failure rate: 12-18% при запуске с -n 4 (parallel workers). При последовательном запуске всё работает. Блокирует deployment уже 3 дня."

Симптомы:

  • pytest tests/integration/ — PASS (100%)
  • pytest tests/integration/ -n 4 — FAIL (12-18%)
  • pytest tests/integration/ --random-order — FAIL (5-10%)

Root cause: Test pollution — тесты не изолированы, shared state в PostgreSQL переживает между запусками. Нет транзакций с rollback.

Стоимость инцидента:

  • CI blocked: 3 дня
  • Engineering investigation: 8 часов
  • Workaround: отключили parallel tests (CI стал 4x медленнее)
  • Total impact: ~$5,000 + 30 минут задержки на каждый deployment

Что такое test isolation (профессиональное определение)

Test isolation — гарантия что каждый тест запускается в чистом окружении без побочных эффектов от предыдущих тестов.

Основные источники утечек в асинхронном коде:

  1. Database state — данные в PostgreSQL/MySQL остаются после теста
  2. Cache state — Redis/Memcached не очищается
  3. Connection leaks — незакрытые async connections
  4. Background tasks — asyncio tasks продолжают работать
  5. Shared async resources — event loops, locks, queues

В этом уроке фокусируемся на #1 (database state) и #2 (cache state) — самые частые проблемы в async приложениях.

Подготовка

Расширяем проект из урока 00. Теперь добавляем настоящие БД вместо файлового кеша.

Установка зависимостей:

cd pytest-from-zero-to-confidence
git checkout lesson-01-async-fixtures  # или создайте свою ветку от fixed
 
# Устанавливаем async драйверы
pip install asyncpg aioredis pytest-asyncio
 
# Запускаем PostgreSQL и Redis (Docker)
docker run -d --name pytest-postgres \
  -e POSTGRES_PASSWORD=testpass \
  -e POSTGRES_DB=todo_test \
  -p 5432:5432 \
  postgres:15-alpine
 
docker run -d --name pytest-redis \
  -p 6379:6379 \
  redis:7-alpine
 
# Проверяем что всё работает
docker ps  # Должны видеть оба контейнера

Минимальная схема БД:

-- schema.sql
CREATE TABLE IF NOT EXISTS tasks (
    id SERIAL PRIMARY KEY,
    task_id VARCHAR(36) UNIQUE NOT NULL,
    text VARCHAR(500) NOT NULL,
    done BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT NOW()
);
 
CREATE INDEX idx_tasks_task_id ON tasks(task_id);

Применяем схему:

docker exec -i pytest-postgres psql -U postgres -d todo_test < schema.sql

Настройка pytest для async:

# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"  # Автоматически определяет async тесты
markers = [
    "asyncio: async tests",
    "integration: integration tests with real DB"
]

Шаг 1. Воспроизводим production bug

Создайте tests/integration/test_async_fixtures_naive.py. Мы намеренно создаём наивную реализацию БЕЗ транзакций, чтобы увидеть проблему.

import asyncio
import asyncpg
import pytest
from src.app import TodoApp
 
 
class NaiveAsyncDBStorage:
    """
    ПЛОХАЯ реализация: нет изоляции между тестами!
 
    Проблема: все тесты работают с одной БД, данные остаются после теста.
    Результат: duplicate key violations при параллельном запуске.
    """
 
    def __init__(self, pool):
        self.pool = pool
 
    async def save_task(self, task_id: str, text: str) -> None:
        async with self.pool.acquire() as conn:
            await conn.execute(
                "INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
                task_id, text
            )
 
    async def get_task(self, task_id: str) -> dict:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT task_id, text, done FROM tasks WHERE task_id = $1",
                task_id
            )
            if not row:
                raise KeyError(f"Task {task_id} not found")
            return dict(row)
 
    async def list_tasks(self) -> list[dict]:
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("SELECT task_id, text, done FROM tasks")
            return [dict(row) for row in rows]
 
 
class NaiveRedisCache:
    """
    ПЛОХАЯ реализация: нет cleanup между тестами!
 
    Проблема: Redis сохраняет данные между тестами.
    """
 
    def __init__(self, redis):
        self.redis = redis
 
    async def set(self, key: str, value: dict) -> None:
        import json
        await self.redis.set(key, json.dumps(value))
 
    async def get(self, key: str) -> dict | None:
        import json
        value = await self.redis.get(key)
        return json.loads(value) if value else None
 
 
# НАИВНЫЕ ФИКСТУРЫ БЕЗ ИЗОЛЯЦИИ
 
@pytest.fixture(scope="session")
async def db_pool():
    """
    Session-scoped pool: создаётся один раз на всю тестовую сессию.
 
    ПРОБЛЕМА: все тесты используют одну БД без очистки!
    """
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="testpass",
        database="todo_test",
        min_size=2,
        max_size=10
    )
    yield pool
    await pool.close()
 
 
@pytest.fixture(scope="session")
async def redis_client():
    """
    Session-scoped Redis: создаётся один раз.
 
    ПРОБЛЕМА: все тесты видят данные друг друга!
    """
    import aioredis
    redis = await aioredis.from_url(
        "redis://localhost:6379",
        decode_responses=True
    )
    yield redis
    await redis.close()
 
 
@pytest.fixture
async def naive_app(db_pool, redis_client):
    """
    Приложение с наивными (плохими) фикстурами.
 
    НЕТ очистки БД, НЕТ транзакций, НЕТ rollback!
    """
    db = NaiveAsyncDBStorage(db_pool)
    cache = NaiveRedisCache(redis_client)
 
    # Remote mock для совместимости с TodoApp
    class RemoteMock:
        def sync_task(self, task):
            pass
        def log(self, event, **kwargs):
            pass
 
    app = TodoApp(cache=cache, remote=RemoteMock())
    app.db = db  # Добавляем DB к приложению
    return app
 
 
# ТЕСТЫ КОТОРЫЕ ПОКАЗЫВАЮТ ПРОБЛЕМУ
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_naive_1(naive_app):
    """
    Тест #1: создаём задачу с task_id="test-task-1"
    """
    task_id = "test-task-1"
    await naive_app.db.save_task(task_id, "Buy milk")
 
    task = await naive_app.db.get_task(task_id)
    assert task["text"] == "Buy milk"
    assert task["done"] is False
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_naive_2(naive_app):
    """
    Тест #2: создаём задачу с ТАКИМ ЖЕ task_id="test-task-1"
 
    При последовательном запуске: PASS (БД очищается между pytest sessions)
    При параллельном запуске: FAIL (duplicate key violation)
    """
    task_id = "test-task-1"  # Конфликт с test #1!
    await naive_app.db.save_task(task_id, "Buy milk")
 
    task = await naive_app.db.get_task(task_id)
    assert task["text"] == "Buy milk"
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_tasks_sees_pollution(naive_app):
    """
    Тест #3: проверяем сколько задач в БД
 
    Ожидание: 0 задач (чистая БД)
    Реальность: видим задачи из предыдущих тестов!
    """
    tasks = await naive_app.db.list_tasks()
 
    # БАГ: видим задачи от других тестов!
    # При последовательном запуске: может быть 0 или больше (зависит от порядка)
    # При параллельном: точно будут задачи от других воркеров
    assert len(tasks) == 0, (
        f"Test pollution detected! Found {len(tasks)} tasks from previous tests. "
        f"Tasks: {[t['task_id'] for t in tasks]}"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_redis_cache_pollution(naive_app):
    """
    Тест #4: проверяем что Redis тоже не изолирован
    """
    # Добавляем что-то в кеш
    await naive_app.cache.set("test-key", {"value": "test-data"})
 
    # В следующем тесте этот ключ всё ещё будет!
    cached = await naive_app.cache.get("test-key")
    assert cached is not None
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_inserts_cause_conflicts(naive_app):
    """
    Тест #5: параллельные вставки с одинаковыми ID
 
    Воспроизводим реальную проблему CI: несколько воркеров
    пытаются создать задачу с одинаковым ID.
    """
    task_id = "concurrent-task"
 
    async def create_task():
        await naive_app.db.save_task(task_id, "Test task")
 
    # Пытаемся создать 5 раз параллельно
    # Если нет транзакций — все 5 запросов попадут в БД
    # Один успешен, 4 упадут с UniqueViolationError
    with pytest.raises(asyncpg.UniqueViolationError):
        await asyncio.gather(
            create_task(),
            create_task(),
            create_task(),
            create_task(),
            create_task(),
        )

Запуск и проверка проблемы:

# 1. Последовательный запуск (может пройти или упасть — недетерминированно)
pytest tests/integration/test_async_fixtures_naive.py -v
 
# 2. Параллельный запуск (скорее всего упадёт)
pytest tests/integration/test_async_fixtures_naive.py -n 4 -v
 
# 3. Random order (точно упадёт)
pytest tests/integration/test_async_fixtures_naive.py --random-order -v
 
# 4. Повторные запуски (flaky behaviour)
pytest tests/integration/test_async_fixtures_naive.py --count=10 -v

Ожидаемые результаты:

FAILED test_create_task_naive_2 - asyncpg.UniqueViolationError:
    duplicate key value violates unique constraint "tasks_task_id_key"
 
FAILED test_list_tasks_sees_pollution - AssertionError:
    Test pollution detected! Found 2 tasks from previous tests.
    Tasks: ['test-task-1', 'concurrent-task']
 
FAILED test_concurrent_inserts_cause_conflicts - AssertionError:
    Expected UniqueViolationError but all inserts succeeded (БД грязная!)

Почему это происходит:

  1. Session-scoped fixtures — создаются один раз на всю сессию
  2. Нет транзакций — изменения коммитятся в БД
  3. Нет rollback — данные остаются после теста
  4. Parallel workers — несколько процессов пишут в одну БД одновременно
  5. Random order — порядок тестов влияет на результат

Это классический test pollution — нарушение изоляции тестов.

Шаг 2. Диагностика: видим проблему изнутри

Добавим логирование чтобы понять когда и как данные утекают между тестами.

# tests/integration/test_async_fixtures_debug.py
import logging
import asyncpg
import pytest
 
# Настраиваем логирование
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
 
@pytest.fixture
async def debug_db_pool():
    """
    Фикстура с логированием всех SQL операций.
    """
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="testpass",
        database="todo_test",
        min_size=2,
        max_size=10
    )
 
    # Логируем создание pool
    logger.info(f"[POOL] Created connection pool: {id(pool)}")
 
    yield pool
 
    # Логируем закрытие
    logger.info(f"[POOL] Closing connection pool: {id(pool)}")
    await pool.close()
 
 
@pytest.fixture
async def debug_app(debug_db_pool, redis_client, request):
    """
    Приложение с дебаг-логами: видим состояние БД до и после теста.
    """
    test_name = request.node.name
 
    # ДО ТЕСТА: сколько задач в БД?
    async with debug_db_pool.acquire() as conn:
        count_before = await conn.fetchval("SELECT COUNT(*) FROM tasks")
        logger.info(f"[{test_name}] BEFORE: {count_before} tasks in DB")
 
        if count_before > 0:
            tasks = await conn.fetch("SELECT task_id FROM tasks")
            logger.warning(f"[{test_name}] POLLUTION: {[t['task_id'] for t in tasks]}")
 
    # Создаём приложение
    db = NaiveAsyncDBStorage(debug_db_pool)
    cache = NaiveRedisCache(redis_client)
 
    class RemoteMock:
        def sync_task(self, task): pass
        def log(self, event, **kwargs): pass
 
    app = TodoApp(cache=cache, remote=RemoteMock())
    app.db = db
 
    yield app
 
    # ПОСЛЕ ТЕСТА: сколько задач осталось?
    async with debug_db_pool.acquire() as conn:
        count_after = await conn.fetchval("SELECT COUNT(*) FROM tasks")
        logger.info(f"[{test_name}] AFTER: {count_after} tasks in DB")
 
        if count_after > count_before:
            new_tasks = await conn.fetch(
                "SELECT task_id FROM tasks ORDER BY created_at DESC LIMIT $1",
                count_after - count_before
            )
            logger.info(f"[{test_name}] CREATED: {[t['task_id'] for t in new_tasks]}")
 
 
@pytest.mark.asyncio
async def test_debug_pollution_test_1(debug_app):
    """Тест 1: создаём task-alpha"""
    await debug_app.db.save_task("task-alpha", "First task")
    logger.info("[test_1] Created task-alpha")
 
 
@pytest.mark.asyncio
async def test_debug_pollution_test_2(debug_app):
    """Тест 2: создаём task-beta"""
    await debug_app.db.save_task("task-beta", "Second task")
    logger.info("[test_2] Created task-beta")
 
 
@pytest.mark.asyncio
async def test_debug_pollution_test_3(debug_app):
    """Тест 3: проверяем что видим"""
    tasks = await debug_app.db.list_tasks()
    logger.info(f"[test_3] Sees {len(tasks)} tasks: {[t['task_id'] for t in tasks]}")

Запуск с выводом логов:

pytest tests/integration/test_async_fixtures_debug.py -v -s

Что вы увидите:

[POOL] Created connection pool: 140123456789
[test_debug_pollution_test_1] BEFORE: 0 tasks in DB
[test_1] Created task-alpha
[test_debug_pollution_test_1] AFTER: 1 tasks in DB
[test_debug_pollution_test_1] CREATED: ['task-alpha']
PASSED
 
[test_debug_pollution_test_2] BEFORE: 1 tasks in DB
[test_debug_pollution_test_2] POLLUTION: ['task-alpha']  ← Видим задачу из test_1!
[test_2] Created task-beta
[test_debug_pollution_test_2] AFTER: 2 tasks in DB
[test_debug_pollution_test_2] CREATED: ['task-beta']
PASSED
 
[test_debug_pollution_test_3] BEFORE: 2 tasks in DB
[test_debug_pollution_test_3] POLLUTION: ['task-alpha', 'task-beta']  ← Видим ВСЁ!
[test_3] Sees 2 tasks: ['task-alpha', 'task-beta']
PASSED
 
[POOL] Closing connection pool: 140123456789

Диагноз:

  • Каждый тест видит данные всех предыдущих тестов
  • Connection pool один на всю сессию
  • Нет механизма очистки между тестами

Теперь проблема очевидна. Переходим к фиксу.

Шаг 3. Быстрый фикс: транзакции с rollback

Начинаем с самого простого и надёжного решения: обернуть каждый тест в транзакцию и откатить её после теста.

# tests/integration/test_async_fixtures_fixed.py
import asyncpg
import pytest
 
 
# ПРАВИЛЬНЫЕ ФИКСТУРЫ С ИЗОЛЯЦИЕЙ
 
@pytest.fixture(scope="session")
async def db_pool():
    """
    Session-scoped pool: создаётся один раз.
    Это правильно — pool дорогой в создании.
    """
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="testpass",
        database="todo_test",
        min_size=2,
        max_size=10
    )
    yield pool
    await pool.close()
 
 
@pytest.fixture
async def db_connection(db_pool):
    """
    Function-scoped connection: каждый тест получает своё подключение.
 
    КРИТИЧНО: используем ТРАНЗАКЦИИ с ROLLBACK для изоляции!
    """
    async with db_pool.acquire() as conn:
        # Начинаем транзакцию
        transaction = conn.transaction()
        await transaction.start()
 
        yield conn
 
        # ВАЖНО: откатываем транзакцию, НЕ коммитим!
        # Все изменения в тесте исчезнут
        await transaction.rollback()
 
 
@pytest.fixture
async def redis_client():
    """
    Redis client с cleanup после каждого теста.
    """
    import aioredis
    redis = await aioredis.from_url(
        "redis://localhost:6379",
        decode_responses=True
    )
 
    yield redis
 
    # CLEANUP: удаляем все ключи после теста
    await redis.flushdb()
    await redis.close()
 
 
class IsolatedDBStorage:
    """
    Storage который работает ВНУТРИ транзакции.
 
    Отличие от NaiveAsyncDBStorage: принимает connection (не pool).
    """
 
    def __init__(self, conn):
        self.conn = conn  # Используем готовое подключение с транзакцией
 
    async def save_task(self, task_id: str, text: str) -> None:
        # Работаем внутри транзакции фикстуры!
        await self.conn.execute(
            "INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
            task_id, text
        )
 
    async def get_task(self, task_id: str) -> dict:
        row = await self.conn.fetchrow(
            "SELECT task_id, text, done FROM tasks WHERE task_id = $1",
            task_id
        )
        if not row:
            raise KeyError(f"Task {task_id} not found")
        return dict(row)
 
    async def list_tasks(self) -> list[dict]:
        rows = await self.conn.fetch("SELECT task_id, text, done FROM tasks")
        return [dict(row) for row in rows]
 
 
@pytest.fixture
async def isolated_app(db_connection, redis_client):
    """
    Приложение с ПОЛНОЙ изоляцией тестов.
 
    - DB: транзакция с rollback
    - Redis: flushdb после теста
    """
    db = IsolatedDBStorage(db_connection)
 
    class SimpleCache:
        def __init__(self, redis):
            self.redis = redis
 
        async def set(self, key: str, value: dict) -> None:
            import json
            await self.redis.set(key, json.dumps(value))
 
        async def get(self, key: str) -> dict | None:
            import json
            value = await self.redis.get(key)
            return json.loads(value) if value else None
 
    cache = SimpleCache(redis_client)
 
    class RemoteMock:
        def sync_task(self, task): pass
        def log(self, event, **kwargs): pass
 
    from src.app import TodoApp
    app = TodoApp(cache=cache, remote=RemoteMock())
    app.db = db
    return app
 
 
# ТЕСТЫ С ИЗОЛЯЦИЕЙ — СТАБИЛЬНЫ В ЛЮБОМ ПОРЯДКЕ
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_isolated_1(isolated_app):
    """
    Тест #1: создаём задачу с task_id="test-task-1"
    После теста: rollback, задачи НЕТ в БД.
    """
    task_id = "test-task-1"
    await isolated_app.db.save_task(task_id, "Buy milk")
 
    task = await isolated_app.db.get_task(task_id)
    assert task["text"] == "Buy milk"
    assert task["done"] is False
 
    # После теста: rollback удалит эту задачу!
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_isolated_2(isolated_app):
    """
    Тест #2: создаём задачу с ТАКИМ ЖЕ task_id="test-task-1"
 
    БЕЗ конфликтов! Тест #1 откатился, БД чистая.
    """
    task_id = "test-task-1"  # Тот же ID — но конфликта НЕТ!
    await isolated_app.db.save_task(task_id, "Buy milk")
 
    task = await isolated_app.db.get_task(task_id)
    assert task["text"] == "Buy milk"
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_tasks_no_pollution(isolated_app):
    """
    Тест #3: проверяем что БД чистая.
 
    ✅ PASS: видим 0 задач (нет pollution!)
    """
    tasks = await isolated_app.db.list_tasks()
    assert len(tasks) == 0, f"БД должна быть пустой, но нашли {len(tasks)} задач"
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_redis_isolated(isolated_app):
    """
    Тест #4: Redis тоже изолирован.
    """
    # Проверяем что Redis пустой
    assert await isolated_app.cache.get("some-key") is None
 
    # Добавляем данные
    await isolated_app.cache.set("some-key", {"value": 123})
 
    # Видим в текущем тесте
    cached = await isolated_app.cache.get("some-key")
    assert cached == {"value": 123}
 
    # После теста: flushdb() удалит ключ
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_inserts_work_now(isolated_app):
    """
    Тест #5: параллельные вставки теперь работают.
 
    Каждый воркер pytest-xdist получает свою транзакцию.
    """
    task_id = "concurrent-task"
 
    # Создаём задачу один раз
    await isolated_app.db.save_task(task_id, "Test task")
 
    # Проверяем что она есть
    task = await isolated_app.db.get_task(task_id)
    assert task["text"] == "Test task"
 
    # После теста: rollback, других воркеров не затронет!

Запуск с проверкой стабильности:

# 1. Последовательно
pytest tests/integration/test_async_fixtures_fixed.py -v
# ✅ PASS (5/5)
 
# 2. Параллельно (4 воркера)
pytest tests/integration/test_async_fixtures_fixed.py -n 4 -v
# ✅ PASS (5/5)
 
# 3. Random order
pytest tests/integration/test_async_fixtures_fixed.py --random-order -v
# ✅ PASS (5/5)
 
# 4. Повторные запуски (проверка flakiness)
pytest tests/integration/test_async_fixtures_fixed.py --count=100 -v
# ✅ PASS (500/500) — 100 runs × 5 tests
 
# 5. СТРЕСС-ТЕСТ: параллельно + random + repeats
pytest tests/integration/test_async_fixtures_fixed.py -n 4 --random-order --count=20 -v
# ✅ PASS (2000/2000) — полная стабильность!

Результат:

=================== 500 passed in 12.34s ===================
0% flakiness, 100% test isolation

Почему это работает:

  1. Transaction per test — каждый тест в своей транзакции
  2. Rollback после теста — изменения НЕ попадают в БД
  3. Connection per test — каждый воркер с независимой транзакцией
  4. Redis flushdb — кеш чистится после теста

Это production-ready подход, используемый в Django, FastAPI, SQLAlchemy.

Шаг 4. Продвинутые варианты

Вариант 1: Nested transactions (savepoints)

Если ваш код сам использует транзакции, простой rollback недостаточен. Нужны savepoints (nested transactions).

@pytest.fixture
async def db_connection_with_savepoint(db_pool):
    """
    Фикстура с SAVEPOINT для nested transactions.
 
    Use case: ваш код делает conn.transaction() внутри себя.
    """
    async with db_pool.acquire() as conn:
        # Главная транзакция
        main_tx = conn.transaction()
        await main_tx.start()
 
        # Savepoint для nested transactions
        async with conn.transaction():
            yield conn
 
        # Откатываем всё
        await main_tx.rollback()
 
 
@pytest.mark.asyncio
async def test_nested_transactions(db_connection_with_savepoint):
    """
    Тест который вызывает код с внутренними транзакциями.
    """
    conn = db_connection_with_savepoint
 
    # Ваш код может делать:
    async with conn.transaction():
        await conn.execute(
            "INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
            "nested-1", "Nested task"
        )
 
    # Всё ещё видим задачу (nested commit успешен)
    row = await conn.fetchrow("SELECT * FROM tasks WHERE task_id = $1", "nested-1")
    assert row is not None
 
    # После теста: главная транзакция откатится, задачи не будет!

Вариант 2: Factory fixtures для множественных подключений

Если тесту нужно несколько подключений (например, симуляция concurrent users):

@pytest.fixture
async def db_connection_factory(db_pool):
    """
    Factory: создаёт N изолированных подключений для теста.
 
    Use case: тестируем race conditions с реальными concurrent connections.
    """
    connections = []
    transactions = []
 
    async def create_connection():
        conn = await db_pool.acquire()
        tx = conn.transaction()
        await tx.start()
        connections.append(conn)
        transactions.append(tx)
        return conn
 
    yield create_connection
 
    # Cleanup: откатываем все транзакции
    for tx in transactions:
        await tx.rollback()
 
    for conn in connections:
        await db_pool.release(conn)
 
 
@pytest.mark.asyncio
async def test_concurrent_updates_with_multiple_connections(db_connection_factory):
    """
    Тест: два пользователя одновременно обновляют задачу.
    """
    # Создаём задачу в одном подключении
    conn1 = await db_connection_factory()
    await conn1.execute(
        "INSERT INTO tasks (task_id, text, done) VALUES ($1, $2, $3)",
        "shared-task", "Shared task", False
    )
 
    # Два пользователя пытаются пометить как done
    conn2 = await db_connection_factory()
    conn3 = await db_connection_factory()
 
    # Параллельные UPDATE
    await asyncio.gather(
        conn2.execute("UPDATE tasks SET done = TRUE WHERE task_id = $1", "shared-task"),
        conn3.execute("UPDATE tasks SET done = TRUE WHERE task_id = $1", "shared-task"),
    )
 
    # Проверяем что один победил (last write wins)
    row = await conn1.fetchrow("SELECT done FROM tasks WHERE task_id = $1", "shared-task")
    assert row["done"] is True

Вариант 3: Database-per-worker для полной изоляции

Если хотите максимальную изоляцию (каждый pytest-xdist worker в своей БД):

# conftest.py
import pytest
 
 
@pytest.fixture(scope="session")
def worker_id(request):
    """
    Получаем ID воркера pytest-xdist.
 
    Returns: "gw0", "gw1", "gw2", "gw3" или "master" (последовательный запуск)
    """
    if hasattr(request.config, "workerinput"):
        return request.config.workerinput["workerid"]
    return "master"
 
 
@pytest.fixture(scope="session")
async def db_pool_per_worker(worker_id):
    """
    Каждый воркер создаёт свою БД: todo_test_gw0, todo_test_gw1, etc.
 
    Pro: нулевая конкуренция между воркерами, можно не использовать транзакции
    Con: нужно создавать/удалять БД, медленнее
    """
    db_name = f"todo_test_{worker_id}"
 
    # Подключаемся к postgres БД для создания тестовой
    sys_pool = await asyncpg.create_pool(
        host="localhost",
        user="postgres",
        password="testpass",
        database="postgres"
    )
 
    async with sys_pool.acquire() as conn:
        # Удаляем если существует
        await conn.execute(f"DROP DATABASE IF EXISTS {db_name}")
        # Создаём новую
        await conn.execute(f"CREATE DATABASE {db_name}")
 
    await sys_pool.close()
 
    # Подключаемся к тестовой БД
    pool = await asyncpg.create_pool(
        host="localhost",
        user="postgres",
        password="testpass",
        database=db_name
    )
 
    # Применяем схему
    async with pool.acquire() as conn:
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id SERIAL PRIMARY KEY,
                task_id VARCHAR(36) UNIQUE NOT NULL,
                text VARCHAR(500) NOT NULL,
                done BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
 
    yield pool
 
    await pool.close()
 
    # Cleanup: удаляем БД после всех тестов воркера
    sys_pool = await asyncpg.create_pool(
        host="localhost",
        user="postgres",
        password="testpass",
        database="postgres"
    )
 
    async with sys_pool.acquire() as conn:
        await conn.execute(f"DROP DATABASE IF EXISTS {db_name}")
 
    await sys_pool.close()

Когда использовать:

  • Тесты модифицируют схему БД (migrations)
  • Нужна полная изоляция без транзакций
  • Много воркеров (> 10), высокая конкуренция за connection pool

Trade-offs:

ПодходИзоляцияСкоростьСложность
Transaction + rollback✅ Отличная⚡ Быстро🟢 Просто
Savepoints✅ Отличная⚡ Быстро🟡 Средне
DB per worker✅ Абсолютная🐌 Медленно🔴 Сложно

Рекомендация: Используйте transaction + rollback (Вариант из Шага 3) — это золотой стандарт.

Шаг 5. Production patterns

Connection pool sizing

Проблема: При -n auto pytest создаёт N воркеров, каждый берёт подключение. Pool exhaustion!

Решение:

import os
 
 
@pytest.fixture(scope="session")
async def db_pool():
    """
    Pool sizing: min = воркеры, max = 2x воркеры.
    """
    # Определяем количество воркеров
    worker_count = int(os.getenv("PYTEST_XDIST_WORKER_COUNT", 1))
 
    pool = await asyncpg.create_pool(
        host="localhost",
        user="postgres",
        password="testpass",
        database="todo_test",
        min_size=worker_count,      # Минимум = воркеры
        max_size=worker_count * 2,  # Максимум = 2x воркеры
        command_timeout=10          # Таймаут на SQL команды
    )
    yield pool
    await pool.close()

Health checks для фикстур

Проблема: Тесты падают с cryptic errors если PostgreSQL/Redis недоступны.

Решение:

@pytest.fixture(scope="session", autouse=True)
async def check_services_health():
    """
    Autouse fixture: проверяет что все сервисы доступны ПЕРЕД запуском тестов.
    """
    import aioredis
 
    # Check PostgreSQL
    try:
        conn = await asyncpg.connect(
            host="localhost",
            user="postgres",
            password="testpass",
            database="todo_test",
            timeout=5
        )
        await conn.close()
    except Exception as e:
        pytest.exit(f"❌ PostgreSQL unavailable: {e}\nRun: docker start pytest-postgres")
 
    # Check Redis
    try:
        redis = await aioredis.from_url("redis://localhost:6379", socket_timeout=5)
        await redis.ping()
        await redis.close()
    except Exception as e:
        pytest.exit(f"❌ Redis unavailable: {e}\nRun: docker start pytest-redis")
 
    print("✅ All services healthy (PostgreSQL, Redis)")

Observability для async fixtures

Интегрируем метрики из урока 03:

from prometheus_client import Histogram, Counter
 
 
# Метрики для фикстур
fixture_setup_duration = Histogram(
    "pytest_fixture_setup_seconds",
    "Time spent setting up fixtures",
    ["fixture_name"]
)
 
fixture_teardown_duration = Histogram(
    "pytest_fixture_teardown_seconds",
    "Time spent tearing down fixtures",
    ["fixture_name"]
)
 
db_transactions = Counter(
    "pytest_db_transactions_total",
    "Total database transactions in tests",
    ["status"]  # committed or rolled_back
)
 
 
@pytest.fixture
async def db_connection_with_metrics(db_pool):
    """
    Фикстура с метриками: измеряем setup/teardown время.
    """
    import time
 
    # Setup
    start = time.time()
    async with db_pool.acquire() as conn:
        tx = conn.transaction()
        await tx.start()
 
        setup_time = time.time() - start
        fixture_setup_duration.labels(fixture_name="db_connection").observe(setup_time)
 
        yield conn
 
        # Teardown
        start = time.time()
        await tx.rollback()
        db_transactions.labels(status="rolled_back").inc()
 
        teardown_time = time.time() - start
        fixture_teardown_duration.labels(fixture_name="db_connection").observe(teardown_time)

CI integration

# .gitlab-ci.yml
test:integration:
  stage: test
  services:
    - postgres:15-alpine
    - redis:7-alpine
 
  variables:
    POSTGRES_DB: todo_test
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: testpass
    POSTGRES_HOST: postgres
    REDIS_HOST: redis
 
  script:
    # Apply schema
    - psql -h postgres -U postgres -d todo_test < schema.sql
 
    # Run tests with parallelism
    - pytest tests/integration/ -n auto -v --maxfail=3
 
    # Check for flaky tests
    - pytest tests/integration/ --count=10 --maxfail=1
 
  artifacts:
    when: always
    reports:
      junit: test-results.xml

Шаг 6. Проверяем качество фикстур

Checklist production-ready async fixtures:

# tests/integration/test_fixture_quality.py
import asyncio
import pytest
 
 
@pytest.mark.asyncio
async def test_fixtures_are_isolated(isolated_app):
    """
    Quality check #1: тесты изолированы.
 
    Запуск: pytest --count=100 (должен быть 0% flakiness)
    """
    tasks_before = await isolated_app.db.list_tasks()
    assert len(tasks_before) == 0, "БД должна быть пустой в начале теста"
 
    await isolated_app.db.save_task("test", "Test task")
    tasks_after = await isolated_app.db.list_tasks()
    assert len(tasks_after) == 1
 
 
@pytest.mark.asyncio
async def test_fixtures_are_fast(isolated_app):
    """
    Quality check #2: фикстуры быстрые.
 
    Requirement: setup + teardown < 100ms
    """
    import time
    start = time.time()
 
    # Минимальная операция с БД
    await isolated_app.db.list_tasks()
 
    duration = time.time() - start
    assert duration < 0.1, f"Фикстура слишком медленная: {duration:.3f}s"
 
 
@pytest.mark.asyncio
async def test_fixtures_handle_concurrent_access(db_connection_factory):
    """
    Quality check #3: фикстуры работают под нагрузкой.
 
    Запуск: pytest -n 4 (параллельные воркеры)
    """
    conns = [await db_connection_factory() for _ in range(10)]
 
    async def insert_task(conn, task_id):
        await conn.execute(
            "INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
            task_id, f"Task {task_id}"
        )
 
    # 10 параллельных вставок
    await asyncio.gather(*[
        insert_task(conns[i], f"task-{i}")
        for i in range(10)
    ])
 
    # Все 10 должны быть в БД
    rows = await conns[0].fetch("SELECT COUNT(*) as cnt FROM tasks")
    assert rows[0]["cnt"] == 10
 
 
@pytest.mark.asyncio
async def test_fixtures_cleanup_properly(isolated_app):
    """
    Quality check #4: нет утечек ресурсов.
 
    Проверяем что после теста нет открытых connections.
    """
    # Создаём данные
    for i in range(100):
        await isolated_app.db.save_task(f"task-{i}", f"Task {i}")
 
    # После теста: rollback должен очистить всё
    # (проверяется в следующем тесте — он должен видеть 0 задач)

Запуск качественных проверок:

# 1. Стабильность (flakiness check)
pytest tests/integration/test_fixture_quality.py::test_fixtures_are_isolated --count=100
 
# 2. Производительность
pytest tests/integration/test_fixture_quality.py::test_fixtures_are_fast -v
 
# 3. Concurrency
pytest tests/integration/test_fixture_quality.py::test_fixtures_handle_concurrent_access -n 4
 
# 4. Cleanup
pytest tests/integration/test_fixture_quality.py::test_fixtures_cleanup_properly -v

📊 Business Impact & ROI

Problem cost (до фикса)

Situation:

  • Integration tests: 12-18% failure rate с -n 4
  • Investigation time: 8 hours per incident
  • Workaround: disabled parallel tests (4x slower CI)
  • Deployments blocked: 3 дня

Cost calculation:

Investigation: 8h × $100/h = $800
CI slowdown: 30 min → 120 min = 90 min/deploy
Deploys per day: 10
Extra CI cost: 90 min × 10 × $0.50/min = $450/day
3 days blocked: $450 × 3 = $1,350
 
Total incident cost: $800 + $1,350 = $2,150

Solution cost

Implementation:

  • Write proper async fixtures: 3 hours
  • Update tests: 2 hours
  • Testing & validation: 2 hours
  • Total: 7h × $100/h = $700

ROI

Monthly savings (после фикса):

  • Parallel tests enabled: CI 4x faster
  • CI time saved: 90 min → 30 min = 60 min/deploy × 200 deploys/month
  • Cost savings: 60 min × 200 × $0.50/min = $6,000/month
  • No investigation overhead: $800/incident saved

First year:

  • Savings: $6,000 × 12 = $72,000
  • Investment: $700
  • ROI: 10,186%
  • Break-even: 3.5 дня

Non-financial benefits

Developer confidence: Can run tests locally with -n autoDeployment velocity: No blocked pipelines ✅ Team morale: No more "flaky test hell" ✅ Onboarding: New developers see stable tests

Production checklist

После внедрения async fixtures убедитесь:

  • Изоляция: pytest --count=100 даёт 0% flakiness
  • Параллелизм: pytest -n auto стабильно работает
  • Производительность: Setup/teardown фикстур < 100ms каждая
  • Connection pool: Размер = 2x количество воркеров
  • Health checks: Services проверяются перед запуском тестов
  • CI integration: GitLab/GitHub Actions с PostgreSQL/Redis services
  • Метрики: Observability добавлена в критические фикстуры
  • Documentation: README с инструкциями setup для новых разработчиков

Что вынести с урока

  1. Test isolation — не опция, а требование для production-качества тестов
  2. Transaction + rollback — золотой стандарт для БД изоляции
  3. Session-scoped pool + function-scoped connections — правильная архитектура
  4. Redis flushdb — простейшая очистка кеша между тестами
  5. Health checks в фикстурах — better developer experience
  6. Connection pool sizing — 2x workers минимум для стабильности
  7. Измеряйте ROI — async fixtures экономят тысячи долларов

Главное отличие от "учебных примеров":

  • ❌ Не игнорируем проблему ("тесты иногда падают, ну и ладно")
  • ✅ Воспроизводим баг, измеряем impact, фиксим properly
  • ✅ Показываем trade-offs разных подходов (transaction vs DB-per-worker)
  • ✅ Даём production checklist и ROI расчёт

В следующем уроке: Гонки в кеше Redis vs БД — stale cache problem, Cache Aside pattern, distributed cache invalidation.