Переход на async: async/await фикстуры
Этот урок продолжает предыдущие два: мы уже знаем test isolation из урока 0 и race conditions из урока 1. Теперь переходим на async/await и настраиваем async фикстуры для PostgreSQL.
Главный фокус — test isolation: научиться писать async фикстуры, которые не создают гонок и утечек состояния между тестами.
Production Issue #1891
Репорт от CI:
"Integration tests randomly fail with
asyncpg.UniqueViolationError: duplicate key value violates unique constraint. Failure rate: 12-18% при запуске с-n 4(parallel workers). При последовательном запуске всё работает. Блокирует deployment уже 3 дня."
Симптомы:
- ✅
pytest tests/integration/— PASS (100%) - ❌
pytest tests/integration/ -n 4— FAIL (12-18%) - ❌
pytest tests/integration/ --random-order— FAIL (5-10%)
Root cause: Test pollution — тесты не изолированы, shared state в PostgreSQL переживает между запусками. Нет транзакций с rollback.
Стоимость инцидента:
- CI blocked: 3 дня
- Engineering investigation: 8 часов
- Workaround: отключили parallel tests (CI стал 4x медленнее)
- Total impact: ~$5,000 + 30 минут задержки на каждый deployment
Что такое test isolation (профессиональное определение)
Test isolation — гарантия что каждый тест запускается в чистом окружении без побочных эффектов от предыдущих тестов.
Основные источники утечек в асинхронном коде:
- Database state — данные в PostgreSQL/MySQL остаются после теста
- Cache state — Redis/Memcached не очищается
- Connection leaks — незакрытые async connections
- Background tasks — asyncio tasks продолжают работать
- Shared async resources — event loops, locks, queues
В этом уроке фокусируемся на #1 (database state) и #2 (cache state) — самые частые проблемы в async приложениях.
Подготовка
Расширяем проект из урока 00. Теперь добавляем настоящие БД вместо файлового кеша.
Установка зависимостей:
cd pytest-from-zero-to-confidence
git checkout lesson-01-async-fixtures # или создайте свою ветку от fixed
# Устанавливаем async драйверы
pip install asyncpg aioredis pytest-asyncio
# Запускаем PostgreSQL и Redis (Docker)
docker run -d --name pytest-postgres \
-e POSTGRES_PASSWORD=testpass \
-e POSTGRES_DB=todo_test \
-p 5432:5432 \
postgres:15-alpine
docker run -d --name pytest-redis \
-p 6379:6379 \
redis:7-alpine
# Проверяем что всё работает
docker ps # Должны видеть оба контейнераМинимальная схема БД:
-- schema.sql
CREATE TABLE IF NOT EXISTS tasks (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) UNIQUE NOT NULL,
text VARCHAR(500) NOT NULL,
done BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_tasks_task_id ON tasks(task_id);Применяем схему:
docker exec -i pytest-postgres psql -U postgres -d todo_test < schema.sqlНастройка pytest для async:
# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto" # Автоматически определяет async тесты
markers = [
"asyncio: async tests",
"integration: integration tests with real DB"
]Шаг 1. Воспроизводим production bug
Создайте tests/integration/test_async_fixtures_naive.py. Мы намеренно создаём наивную реализацию БЕЗ транзакций, чтобы увидеть проблему.
import asyncio
import asyncpg
import pytest
from src.app import TodoApp
class NaiveAsyncDBStorage:
"""
ПЛОХАЯ реализация: нет изоляции между тестами!
Проблема: все тесты работают с одной БД, данные остаются после теста.
Результат: duplicate key violations при параллельном запуске.
"""
def __init__(self, pool):
self.pool = pool
async def save_task(self, task_id: str, text: str) -> None:
async with self.pool.acquire() as conn:
await conn.execute(
"INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
task_id, text
)
async def get_task(self, task_id: str) -> dict:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT task_id, text, done FROM tasks WHERE task_id = $1",
task_id
)
if not row:
raise KeyError(f"Task {task_id} not found")
return dict(row)
async def list_tasks(self) -> list[dict]:
async with self.pool.acquire() as conn:
rows = await conn.fetch("SELECT task_id, text, done FROM tasks")
return [dict(row) for row in rows]
class NaiveRedisCache:
"""
ПЛОХАЯ реализация: нет cleanup между тестами!
Проблема: Redis сохраняет данные между тестами.
"""
def __init__(self, redis):
self.redis = redis
async def set(self, key: str, value: dict) -> None:
import json
await self.redis.set(key, json.dumps(value))
async def get(self, key: str) -> dict | None:
import json
value = await self.redis.get(key)
return json.loads(value) if value else None
# НАИВНЫЕ ФИКСТУРЫ БЕЗ ИЗОЛЯЦИИ
@pytest.fixture(scope="session")
async def db_pool():
"""
Session-scoped pool: создаётся один раз на всю тестовую сессию.
ПРОБЛЕМА: все тесты используют одну БД без очистки!
"""
pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="testpass",
database="todo_test",
min_size=2,
max_size=10
)
yield pool
await pool.close()
@pytest.fixture(scope="session")
async def redis_client():
"""
Session-scoped Redis: создаётся один раз.
ПРОБЛЕМА: все тесты видят данные друг друга!
"""
import aioredis
redis = await aioredis.from_url(
"redis://localhost:6379",
decode_responses=True
)
yield redis
await redis.close()
@pytest.fixture
async def naive_app(db_pool, redis_client):
"""
Приложение с наивными (плохими) фикстурами.
НЕТ очистки БД, НЕТ транзакций, НЕТ rollback!
"""
db = NaiveAsyncDBStorage(db_pool)
cache = NaiveRedisCache(redis_client)
# Remote mock для совместимости с TodoApp
class RemoteMock:
def sync_task(self, task):
pass
def log(self, event, **kwargs):
pass
app = TodoApp(cache=cache, remote=RemoteMock())
app.db = db # Добавляем DB к приложению
return app
# ТЕСТЫ КОТОРЫЕ ПОКАЗЫВАЮТ ПРОБЛЕМУ
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_naive_1(naive_app):
"""
Тест #1: создаём задачу с task_id="test-task-1"
"""
task_id = "test-task-1"
await naive_app.db.save_task(task_id, "Buy milk")
task = await naive_app.db.get_task(task_id)
assert task["text"] == "Buy milk"
assert task["done"] is False
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_naive_2(naive_app):
"""
Тест #2: создаём задачу с ТАКИМ ЖЕ task_id="test-task-1"
При последовательном запуске: PASS (БД очищается между pytest sessions)
При параллельном запуске: FAIL (duplicate key violation)
"""
task_id = "test-task-1" # Конфликт с test #1!
await naive_app.db.save_task(task_id, "Buy milk")
task = await naive_app.db.get_task(task_id)
assert task["text"] == "Buy milk"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_tasks_sees_pollution(naive_app):
"""
Тест #3: проверяем сколько задач в БД
Ожидание: 0 задач (чистая БД)
Реальность: видим задачи из предыдущих тестов!
"""
tasks = await naive_app.db.list_tasks()
# БАГ: видим задачи от других тестов!
# При последовательном запуске: может быть 0 или больше (зависит от порядка)
# При параллельном: точно будут задачи от других воркеров
assert len(tasks) == 0, (
f"Test pollution detected! Found {len(tasks)} tasks from previous tests. "
f"Tasks: {[t['task_id'] for t in tasks]}"
)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_redis_cache_pollution(naive_app):
"""
Тест #4: проверяем что Redis тоже не изолирован
"""
# Добавляем что-то в кеш
await naive_app.cache.set("test-key", {"value": "test-data"})
# В следующем тесте этот ключ всё ещё будет!
cached = await naive_app.cache.get("test-key")
assert cached is not None
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_inserts_cause_conflicts(naive_app):
"""
Тест #5: параллельные вставки с одинаковыми ID
Воспроизводим реальную проблему CI: несколько воркеров
пытаются создать задачу с одинаковым ID.
"""
task_id = "concurrent-task"
async def create_task():
await naive_app.db.save_task(task_id, "Test task")
# Пытаемся создать 5 раз параллельно
# Если нет транзакций — все 5 запросов попадут в БД
# Один успешен, 4 упадут с UniqueViolationError
with pytest.raises(asyncpg.UniqueViolationError):
await asyncio.gather(
create_task(),
create_task(),
create_task(),
create_task(),
create_task(),
)Запуск и проверка проблемы:
# 1. Последовательный запуск (может пройти или упасть — недетерминированно)
pytest tests/integration/test_async_fixtures_naive.py -v
# 2. Параллельный запуск (скорее всего упадёт)
pytest tests/integration/test_async_fixtures_naive.py -n 4 -v
# 3. Random order (точно упадёт)
pytest tests/integration/test_async_fixtures_naive.py --random-order -v
# 4. Повторные запуски (flaky behaviour)
pytest tests/integration/test_async_fixtures_naive.py --count=10 -vОжидаемые результаты:
FAILED test_create_task_naive_2 - asyncpg.UniqueViolationError:
duplicate key value violates unique constraint "tasks_task_id_key"
FAILED test_list_tasks_sees_pollution - AssertionError:
Test pollution detected! Found 2 tasks from previous tests.
Tasks: ['test-task-1', 'concurrent-task']
FAILED test_concurrent_inserts_cause_conflicts - AssertionError:
Expected UniqueViolationError but all inserts succeeded (БД грязная!)Почему это происходит:
- Session-scoped fixtures — создаются один раз на всю сессию
- Нет транзакций — изменения коммитятся в БД
- Нет rollback — данные остаются после теста
- Parallel workers — несколько процессов пишут в одну БД одновременно
- Random order — порядок тестов влияет на результат
Это классический test pollution — нарушение изоляции тестов.
Шаг 2. Диагностика: видим проблему изнутри
Добавим логирование чтобы понять когда и как данные утекают между тестами.
# tests/integration/test_async_fixtures_debug.py
import logging
import asyncpg
import pytest
# Настраиваем логирование
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@pytest.fixture
async def debug_db_pool():
"""
Фикстура с логированием всех SQL операций.
"""
pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="testpass",
database="todo_test",
min_size=2,
max_size=10
)
# Логируем создание pool
logger.info(f"[POOL] Created connection pool: {id(pool)}")
yield pool
# Логируем закрытие
logger.info(f"[POOL] Closing connection pool: {id(pool)}")
await pool.close()
@pytest.fixture
async def debug_app(debug_db_pool, redis_client, request):
"""
Приложение с дебаг-логами: видим состояние БД до и после теста.
"""
test_name = request.node.name
# ДО ТЕСТА: сколько задач в БД?
async with debug_db_pool.acquire() as conn:
count_before = await conn.fetchval("SELECT COUNT(*) FROM tasks")
logger.info(f"[{test_name}] BEFORE: {count_before} tasks in DB")
if count_before > 0:
tasks = await conn.fetch("SELECT task_id FROM tasks")
logger.warning(f"[{test_name}] POLLUTION: {[t['task_id'] for t in tasks]}")
# Создаём приложение
db = NaiveAsyncDBStorage(debug_db_pool)
cache = NaiveRedisCache(redis_client)
class RemoteMock:
def sync_task(self, task): pass
def log(self, event, **kwargs): pass
app = TodoApp(cache=cache, remote=RemoteMock())
app.db = db
yield app
# ПОСЛЕ ТЕСТА: сколько задач осталось?
async with debug_db_pool.acquire() as conn:
count_after = await conn.fetchval("SELECT COUNT(*) FROM tasks")
logger.info(f"[{test_name}] AFTER: {count_after} tasks in DB")
if count_after > count_before:
new_tasks = await conn.fetch(
"SELECT task_id FROM tasks ORDER BY created_at DESC LIMIT $1",
count_after - count_before
)
logger.info(f"[{test_name}] CREATED: {[t['task_id'] for t in new_tasks]}")
@pytest.mark.asyncio
async def test_debug_pollution_test_1(debug_app):
"""Тест 1: создаём task-alpha"""
await debug_app.db.save_task("task-alpha", "First task")
logger.info("[test_1] Created task-alpha")
@pytest.mark.asyncio
async def test_debug_pollution_test_2(debug_app):
"""Тест 2: создаём task-beta"""
await debug_app.db.save_task("task-beta", "Second task")
logger.info("[test_2] Created task-beta")
@pytest.mark.asyncio
async def test_debug_pollution_test_3(debug_app):
"""Тест 3: проверяем что видим"""
tasks = await debug_app.db.list_tasks()
logger.info(f"[test_3] Sees {len(tasks)} tasks: {[t['task_id'] for t in tasks]}")Запуск с выводом логов:
pytest tests/integration/test_async_fixtures_debug.py -v -sЧто вы увидите:
[POOL] Created connection pool: 140123456789
[test_debug_pollution_test_1] BEFORE: 0 tasks in DB
[test_1] Created task-alpha
[test_debug_pollution_test_1] AFTER: 1 tasks in DB
[test_debug_pollution_test_1] CREATED: ['task-alpha']
PASSED
[test_debug_pollution_test_2] BEFORE: 1 tasks in DB
[test_debug_pollution_test_2] POLLUTION: ['task-alpha'] ← Видим задачу из test_1!
[test_2] Created task-beta
[test_debug_pollution_test_2] AFTER: 2 tasks in DB
[test_debug_pollution_test_2] CREATED: ['task-beta']
PASSED
[test_debug_pollution_test_3] BEFORE: 2 tasks in DB
[test_debug_pollution_test_3] POLLUTION: ['task-alpha', 'task-beta'] ← Видим ВСЁ!
[test_3] Sees 2 tasks: ['task-alpha', 'task-beta']
PASSED
[POOL] Closing connection pool: 140123456789Диагноз:
- Каждый тест видит данные всех предыдущих тестов
- Connection pool один на всю сессию
- Нет механизма очистки между тестами
Теперь проблема очевидна. Переходим к фиксу.
Шаг 3. Быстрый фикс: транзакции с rollback
Начинаем с самого простого и надёжного решения: обернуть каждый тест в транзакцию и откатить её после теста.
# tests/integration/test_async_fixtures_fixed.py
import asyncpg
import pytest
# ПРАВИЛЬНЫЕ ФИКСТУРЫ С ИЗОЛЯЦИЕЙ
@pytest.fixture(scope="session")
async def db_pool():
"""
Session-scoped pool: создаётся один раз.
Это правильно — pool дорогой в создании.
"""
pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="testpass",
database="todo_test",
min_size=2,
max_size=10
)
yield pool
await pool.close()
@pytest.fixture
async def db_connection(db_pool):
"""
Function-scoped connection: каждый тест получает своё подключение.
КРИТИЧНО: используем ТРАНЗАКЦИИ с ROLLBACK для изоляции!
"""
async with db_pool.acquire() as conn:
# Начинаем транзакцию
transaction = conn.transaction()
await transaction.start()
yield conn
# ВАЖНО: откатываем транзакцию, НЕ коммитим!
# Все изменения в тесте исчезнут
await transaction.rollback()
@pytest.fixture
async def redis_client():
"""
Redis client с cleanup после каждого теста.
"""
import aioredis
redis = await aioredis.from_url(
"redis://localhost:6379",
decode_responses=True
)
yield redis
# CLEANUP: удаляем все ключи после теста
await redis.flushdb()
await redis.close()
class IsolatedDBStorage:
"""
Storage который работает ВНУТРИ транзакции.
Отличие от NaiveAsyncDBStorage: принимает connection (не pool).
"""
def __init__(self, conn):
self.conn = conn # Используем готовое подключение с транзакцией
async def save_task(self, task_id: str, text: str) -> None:
# Работаем внутри транзакции фикстуры!
await self.conn.execute(
"INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
task_id, text
)
async def get_task(self, task_id: str) -> dict:
row = await self.conn.fetchrow(
"SELECT task_id, text, done FROM tasks WHERE task_id = $1",
task_id
)
if not row:
raise KeyError(f"Task {task_id} not found")
return dict(row)
async def list_tasks(self) -> list[dict]:
rows = await self.conn.fetch("SELECT task_id, text, done FROM tasks")
return [dict(row) for row in rows]
@pytest.fixture
async def isolated_app(db_connection, redis_client):
"""
Приложение с ПОЛНОЙ изоляцией тестов.
- DB: транзакция с rollback
- Redis: flushdb после теста
"""
db = IsolatedDBStorage(db_connection)
class SimpleCache:
def __init__(self, redis):
self.redis = redis
async def set(self, key: str, value: dict) -> None:
import json
await self.redis.set(key, json.dumps(value))
async def get(self, key: str) -> dict | None:
import json
value = await self.redis.get(key)
return json.loads(value) if value else None
cache = SimpleCache(redis_client)
class RemoteMock:
def sync_task(self, task): pass
def log(self, event, **kwargs): pass
from src.app import TodoApp
app = TodoApp(cache=cache, remote=RemoteMock())
app.db = db
return app
# ТЕСТЫ С ИЗОЛЯЦИЕЙ — СТАБИЛЬНЫ В ЛЮБОМ ПОРЯДКЕ
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_isolated_1(isolated_app):
"""
Тест #1: создаём задачу с task_id="test-task-1"
После теста: rollback, задачи НЕТ в БД.
"""
task_id = "test-task-1"
await isolated_app.db.save_task(task_id, "Buy milk")
task = await isolated_app.db.get_task(task_id)
assert task["text"] == "Buy milk"
assert task["done"] is False
# После теста: rollback удалит эту задачу!
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_task_isolated_2(isolated_app):
"""
Тест #2: создаём задачу с ТАКИМ ЖЕ task_id="test-task-1"
БЕЗ конфликтов! Тест #1 откатился, БД чистая.
"""
task_id = "test-task-1" # Тот же ID — но конфликта НЕТ!
await isolated_app.db.save_task(task_id, "Buy milk")
task = await isolated_app.db.get_task(task_id)
assert task["text"] == "Buy milk"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_tasks_no_pollution(isolated_app):
"""
Тест #3: проверяем что БД чистая.
✅ PASS: видим 0 задач (нет pollution!)
"""
tasks = await isolated_app.db.list_tasks()
assert len(tasks) == 0, f"БД должна быть пустой, но нашли {len(tasks)} задач"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_redis_isolated(isolated_app):
"""
Тест #4: Redis тоже изолирован.
"""
# Проверяем что Redis пустой
assert await isolated_app.cache.get("some-key") is None
# Добавляем данные
await isolated_app.cache.set("some-key", {"value": 123})
# Видим в текущем тесте
cached = await isolated_app.cache.get("some-key")
assert cached == {"value": 123}
# После теста: flushdb() удалит ключ
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_inserts_work_now(isolated_app):
"""
Тест #5: параллельные вставки теперь работают.
Каждый воркер pytest-xdist получает свою транзакцию.
"""
task_id = "concurrent-task"
# Создаём задачу один раз
await isolated_app.db.save_task(task_id, "Test task")
# Проверяем что она есть
task = await isolated_app.db.get_task(task_id)
assert task["text"] == "Test task"
# После теста: rollback, других воркеров не затронет!Запуск с проверкой стабильности:
# 1. Последовательно
pytest tests/integration/test_async_fixtures_fixed.py -v
# ✅ PASS (5/5)
# 2. Параллельно (4 воркера)
pytest tests/integration/test_async_fixtures_fixed.py -n 4 -v
# ✅ PASS (5/5)
# 3. Random order
pytest tests/integration/test_async_fixtures_fixed.py --random-order -v
# ✅ PASS (5/5)
# 4. Повторные запуски (проверка flakiness)
pytest tests/integration/test_async_fixtures_fixed.py --count=100 -v
# ✅ PASS (500/500) — 100 runs × 5 tests
# 5. СТРЕСС-ТЕСТ: параллельно + random + repeats
pytest tests/integration/test_async_fixtures_fixed.py -n 4 --random-order --count=20 -v
# ✅ PASS (2000/2000) — полная стабильность!Результат:
=================== 500 passed in 12.34s ===================
0% flakiness, 100% test isolationПочему это работает:
- Transaction per test — каждый тест в своей транзакции
- Rollback после теста — изменения НЕ попадают в БД
- Connection per test — каждый воркер с независимой транзакцией
- Redis flushdb — кеш чистится после теста
Это production-ready подход, используемый в Django, FastAPI, SQLAlchemy.
Шаг 4. Продвинутые варианты
Вариант 1: Nested transactions (savepoints)
Если ваш код сам использует транзакции, простой rollback недостаточен. Нужны savepoints (nested transactions).
@pytest.fixture
async def db_connection_with_savepoint(db_pool):
"""
Фикстура с SAVEPOINT для nested transactions.
Use case: ваш код делает conn.transaction() внутри себя.
"""
async with db_pool.acquire() as conn:
# Главная транзакция
main_tx = conn.transaction()
await main_tx.start()
# Savepoint для nested transactions
async with conn.transaction():
yield conn
# Откатываем всё
await main_tx.rollback()
@pytest.mark.asyncio
async def test_nested_transactions(db_connection_with_savepoint):
"""
Тест который вызывает код с внутренними транзакциями.
"""
conn = db_connection_with_savepoint
# Ваш код может делать:
async with conn.transaction():
await conn.execute(
"INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
"nested-1", "Nested task"
)
# Всё ещё видим задачу (nested commit успешен)
row = await conn.fetchrow("SELECT * FROM tasks WHERE task_id = $1", "nested-1")
assert row is not None
# После теста: главная транзакция откатится, задачи не будет!Вариант 2: Factory fixtures для множественных подключений
Если тесту нужно несколько подключений (например, симуляция concurrent users):
@pytest.fixture
async def db_connection_factory(db_pool):
"""
Factory: создаёт N изолированных подключений для теста.
Use case: тестируем race conditions с реальными concurrent connections.
"""
connections = []
transactions = []
async def create_connection():
conn = await db_pool.acquire()
tx = conn.transaction()
await tx.start()
connections.append(conn)
transactions.append(tx)
return conn
yield create_connection
# Cleanup: откатываем все транзакции
for tx in transactions:
await tx.rollback()
for conn in connections:
await db_pool.release(conn)
@pytest.mark.asyncio
async def test_concurrent_updates_with_multiple_connections(db_connection_factory):
"""
Тест: два пользователя одновременно обновляют задачу.
"""
# Создаём задачу в одном подключении
conn1 = await db_connection_factory()
await conn1.execute(
"INSERT INTO tasks (task_id, text, done) VALUES ($1, $2, $3)",
"shared-task", "Shared task", False
)
# Два пользователя пытаются пометить как done
conn2 = await db_connection_factory()
conn3 = await db_connection_factory()
# Параллельные UPDATE
await asyncio.gather(
conn2.execute("UPDATE tasks SET done = TRUE WHERE task_id = $1", "shared-task"),
conn3.execute("UPDATE tasks SET done = TRUE WHERE task_id = $1", "shared-task"),
)
# Проверяем что один победил (last write wins)
row = await conn1.fetchrow("SELECT done FROM tasks WHERE task_id = $1", "shared-task")
assert row["done"] is TrueВариант 3: Database-per-worker для полной изоляции
Если хотите максимальную изоляцию (каждый pytest-xdist worker в своей БД):
# conftest.py
import pytest
@pytest.fixture(scope="session")
def worker_id(request):
"""
Получаем ID воркера pytest-xdist.
Returns: "gw0", "gw1", "gw2", "gw3" или "master" (последовательный запуск)
"""
if hasattr(request.config, "workerinput"):
return request.config.workerinput["workerid"]
return "master"
@pytest.fixture(scope="session")
async def db_pool_per_worker(worker_id):
"""
Каждый воркер создаёт свою БД: todo_test_gw0, todo_test_gw1, etc.
Pro: нулевая конкуренция между воркерами, можно не использовать транзакции
Con: нужно создавать/удалять БД, медленнее
"""
db_name = f"todo_test_{worker_id}"
# Подключаемся к postgres БД для создания тестовой
sys_pool = await asyncpg.create_pool(
host="localhost",
user="postgres",
password="testpass",
database="postgres"
)
async with sys_pool.acquire() as conn:
# Удаляем если существует
await conn.execute(f"DROP DATABASE IF EXISTS {db_name}")
# Создаём новую
await conn.execute(f"CREATE DATABASE {db_name}")
await sys_pool.close()
# Подключаемся к тестовой БД
pool = await asyncpg.create_pool(
host="localhost",
user="postgres",
password="testpass",
database=db_name
)
# Применяем схему
async with pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) UNIQUE NOT NULL,
text VARCHAR(500) NOT NULL,
done BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
)
""")
yield pool
await pool.close()
# Cleanup: удаляем БД после всех тестов воркера
sys_pool = await asyncpg.create_pool(
host="localhost",
user="postgres",
password="testpass",
database="postgres"
)
async with sys_pool.acquire() as conn:
await conn.execute(f"DROP DATABASE IF EXISTS {db_name}")
await sys_pool.close()Когда использовать:
- Тесты модифицируют схему БД (migrations)
- Нужна полная изоляция без транзакций
- Много воркеров (> 10), высокая конкуренция за connection pool
Trade-offs:
| Подход | Изоляция | Скорость | Сложность |
|---|---|---|---|
| Transaction + rollback | ✅ Отличная | ⚡ Быстро | 🟢 Просто |
| Savepoints | ✅ Отличная | ⚡ Быстро | 🟡 Средне |
| DB per worker | ✅ Абсолютная | 🐌 Медленно | 🔴 Сложно |
Рекомендация: Используйте transaction + rollback (Вариант из Шага 3) — это золотой стандарт.
Шаг 5. Production patterns
Connection pool sizing
Проблема: При -n auto pytest создаёт N воркеров, каждый берёт подключение. Pool exhaustion!
Решение:
import os
@pytest.fixture(scope="session")
async def db_pool():
"""
Pool sizing: min = воркеры, max = 2x воркеры.
"""
# Определяем количество воркеров
worker_count = int(os.getenv("PYTEST_XDIST_WORKER_COUNT", 1))
pool = await asyncpg.create_pool(
host="localhost",
user="postgres",
password="testpass",
database="todo_test",
min_size=worker_count, # Минимум = воркеры
max_size=worker_count * 2, # Максимум = 2x воркеры
command_timeout=10 # Таймаут на SQL команды
)
yield pool
await pool.close()Health checks для фикстур
Проблема: Тесты падают с cryptic errors если PostgreSQL/Redis недоступны.
Решение:
@pytest.fixture(scope="session", autouse=True)
async def check_services_health():
"""
Autouse fixture: проверяет что все сервисы доступны ПЕРЕД запуском тестов.
"""
import aioredis
# Check PostgreSQL
try:
conn = await asyncpg.connect(
host="localhost",
user="postgres",
password="testpass",
database="todo_test",
timeout=5
)
await conn.close()
except Exception as e:
pytest.exit(f"❌ PostgreSQL unavailable: {e}\nRun: docker start pytest-postgres")
# Check Redis
try:
redis = await aioredis.from_url("redis://localhost:6379", socket_timeout=5)
await redis.ping()
await redis.close()
except Exception as e:
pytest.exit(f"❌ Redis unavailable: {e}\nRun: docker start pytest-redis")
print("✅ All services healthy (PostgreSQL, Redis)")Observability для async fixtures
Интегрируем метрики из урока 03:
from prometheus_client import Histogram, Counter
# Метрики для фикстур
fixture_setup_duration = Histogram(
"pytest_fixture_setup_seconds",
"Time spent setting up fixtures",
["fixture_name"]
)
fixture_teardown_duration = Histogram(
"pytest_fixture_teardown_seconds",
"Time spent tearing down fixtures",
["fixture_name"]
)
db_transactions = Counter(
"pytest_db_transactions_total",
"Total database transactions in tests",
["status"] # committed or rolled_back
)
@pytest.fixture
async def db_connection_with_metrics(db_pool):
"""
Фикстура с метриками: измеряем setup/teardown время.
"""
import time
# Setup
start = time.time()
async with db_pool.acquire() as conn:
tx = conn.transaction()
await tx.start()
setup_time = time.time() - start
fixture_setup_duration.labels(fixture_name="db_connection").observe(setup_time)
yield conn
# Teardown
start = time.time()
await tx.rollback()
db_transactions.labels(status="rolled_back").inc()
teardown_time = time.time() - start
fixture_teardown_duration.labels(fixture_name="db_connection").observe(teardown_time)CI integration
# .gitlab-ci.yml
test:integration:
stage: test
services:
- postgres:15-alpine
- redis:7-alpine
variables:
POSTGRES_DB: todo_test
POSTGRES_USER: postgres
POSTGRES_PASSWORD: testpass
POSTGRES_HOST: postgres
REDIS_HOST: redis
script:
# Apply schema
- psql -h postgres -U postgres -d todo_test < schema.sql
# Run tests with parallelism
- pytest tests/integration/ -n auto -v --maxfail=3
# Check for flaky tests
- pytest tests/integration/ --count=10 --maxfail=1
artifacts:
when: always
reports:
junit: test-results.xmlШаг 6. Проверяем качество фикстур
Checklist production-ready async fixtures:
# tests/integration/test_fixture_quality.py
import asyncio
import pytest
@pytest.mark.asyncio
async def test_fixtures_are_isolated(isolated_app):
"""
Quality check #1: тесты изолированы.
Запуск: pytest --count=100 (должен быть 0% flakiness)
"""
tasks_before = await isolated_app.db.list_tasks()
assert len(tasks_before) == 0, "БД должна быть пустой в начале теста"
await isolated_app.db.save_task("test", "Test task")
tasks_after = await isolated_app.db.list_tasks()
assert len(tasks_after) == 1
@pytest.mark.asyncio
async def test_fixtures_are_fast(isolated_app):
"""
Quality check #2: фикстуры быстрые.
Requirement: setup + teardown < 100ms
"""
import time
start = time.time()
# Минимальная операция с БД
await isolated_app.db.list_tasks()
duration = time.time() - start
assert duration < 0.1, f"Фикстура слишком медленная: {duration:.3f}s"
@pytest.mark.asyncio
async def test_fixtures_handle_concurrent_access(db_connection_factory):
"""
Quality check #3: фикстуры работают под нагрузкой.
Запуск: pytest -n 4 (параллельные воркеры)
"""
conns = [await db_connection_factory() for _ in range(10)]
async def insert_task(conn, task_id):
await conn.execute(
"INSERT INTO tasks (task_id, text) VALUES ($1, $2)",
task_id, f"Task {task_id}"
)
# 10 параллельных вставок
await asyncio.gather(*[
insert_task(conns[i], f"task-{i}")
for i in range(10)
])
# Все 10 должны быть в БД
rows = await conns[0].fetch("SELECT COUNT(*) as cnt FROM tasks")
assert rows[0]["cnt"] == 10
@pytest.mark.asyncio
async def test_fixtures_cleanup_properly(isolated_app):
"""
Quality check #4: нет утечек ресурсов.
Проверяем что после теста нет открытых connections.
"""
# Создаём данные
for i in range(100):
await isolated_app.db.save_task(f"task-{i}", f"Task {i}")
# После теста: rollback должен очистить всё
# (проверяется в следующем тесте — он должен видеть 0 задач)Запуск качественных проверок:
# 1. Стабильность (flakiness check)
pytest tests/integration/test_fixture_quality.py::test_fixtures_are_isolated --count=100
# 2. Производительность
pytest tests/integration/test_fixture_quality.py::test_fixtures_are_fast -v
# 3. Concurrency
pytest tests/integration/test_fixture_quality.py::test_fixtures_handle_concurrent_access -n 4
# 4. Cleanup
pytest tests/integration/test_fixture_quality.py::test_fixtures_cleanup_properly -v📊 Business Impact & ROI
Problem cost (до фикса)
Situation:
- Integration tests: 12-18% failure rate с
-n 4 - Investigation time: 8 hours per incident
- Workaround: disabled parallel tests (4x slower CI)
- Deployments blocked: 3 дня
Cost calculation:
Investigation: 8h × $100/h = $800
CI slowdown: 30 min → 120 min = 90 min/deploy
Deploys per day: 10
Extra CI cost: 90 min × 10 × $0.50/min = $450/day
3 days blocked: $450 × 3 = $1,350
Total incident cost: $800 + $1,350 = $2,150Solution cost
Implementation:
- Write proper async fixtures: 3 hours
- Update tests: 2 hours
- Testing & validation: 2 hours
- Total: 7h × $100/h = $700
ROI
Monthly savings (после фикса):
- Parallel tests enabled: CI 4x faster
- CI time saved: 90 min → 30 min = 60 min/deploy × 200 deploys/month
- Cost savings: 60 min × 200 × $0.50/min = $6,000/month
- No investigation overhead: $800/incident saved
First year:
- Savings: $6,000 × 12 = $72,000
- Investment: $700
- ROI: 10,186%
- Break-even: 3.5 дня
Non-financial benefits
✅ Developer confidence: Can run tests locally with -n auto
✅ Deployment velocity: No blocked pipelines
✅ Team morale: No more "flaky test hell"
✅ Onboarding: New developers see stable tests
Production checklist
После внедрения async fixtures убедитесь:
- Изоляция:
pytest --count=100даёт 0% flakiness - Параллелизм:
pytest -n autoстабильно работает - Производительность: Setup/teardown фикстур < 100ms каждая
- Connection pool: Размер = 2x количество воркеров
- Health checks: Services проверяются перед запуском тестов
- CI integration: GitLab/GitHub Actions с PostgreSQL/Redis services
- Метрики: Observability добавлена в критические фикстуры
- Documentation: README с инструкциями setup для новых разработчиков
Что вынести с урока
- Test isolation — не опция, а требование для production-качества тестов
- Transaction + rollback — золотой стандарт для БД изоляции
- Session-scoped pool + function-scoped connections — правильная архитектура
- Redis flushdb — простейшая очистка кеша между тестами
- Health checks в фикстурах — better developer experience
- Connection pool sizing — 2x workers минимум для стабильности
- Измеряйте ROI — async fixtures экономят тысячи долларов
Главное отличие от "учебных примеров":
- ❌ Не игнорируем проблему ("тесты иногда падают, ну и ладно")
- ✅ Воспроизводим баг, измеряем impact, фиксим properly
- ✅ Показываем trade-offs разных подходов (transaction vs DB-per-worker)
- ✅ Даём production checklist и ROI расчёт
В следующем уроке: Гонки в кеше Redis vs БД — stale cache problem, Cache Aside pattern, distributed cache invalidation.