Перейти к содержимому
К программе курса
Pytest: Борьба с Race Conditions в Async-коде
5 / 956%

Async Redis фикстуры и Cache Aside

50 минут

В этом уроке добавляем Redis в наш tech stack и настраиваем фикстуры для тестирования кеша. Это фундамент перед следующим уроком про гонки кеша.

Главная цель: научиться тестировать приложения с Redis, понять Cache Aside pattern и test isolation для кеша.

Зачем Redis в тестах?

Production реальность:

  • 90% современных приложений используют Redis для кеширования
  • Кеш вносит свои race conditions и flaky tests
  • Нельзя тестировать только PostgreSQL, игнорируя Redis

Наша задача: научиться писать чистые, изолированные тесты с Redis.

Подготовка: запускаем Redis

Docker:

docker run -d --name pytest-redis \
  -p 6379:6379 \
  redis:7-alpine

Проверяем:

docker exec -it pytest-redis redis-cli ping
# Должно вернуть: PONG

Устанавливаем клиент:

pip install redis[asyncio]
# или
pip install aioredis

Redis фикстура с изоляцией

Проблема test pollution:

Если не чистить Redis между тестами, данные одного теста влияют на другой.

Правильная фикстура:

# conftest.py
import pytest
import redis.asyncio as aioredis
 
@pytest.fixture
async def redis_client():
    """
    Redis клиент с автоматической очисткой после теста.
    Используем отдельную БД для тестов.
    """
    client = aioredis.from_url(
        "redis://localhost:6379/15",  # DB 15 для тестов
        decode_responses=True
    )
 
    yield client
 
    # Чистим БД после теста
    await client.flushdb()
    await client.close()

Почему DB 15?

  • Redis имеет 16 БД (0-15)
  • Production обычно использует DB 0
  • Тесты используют DB 15 → изоляция

Cache Aside Pattern

Самый популярный паттерн кеширования:

  1. Запрос данных → проверяем кеш
  2. Если в кеше — возвращаем (cache hit)
  3. Если нет — читаем из БД, сохраняем в кеш (cache miss)

Реализация:

# cache.py
import json
from typing import Optional
 
class CachedRepository:
    def __init__(self, db_conn, redis_client):
        self.db = db_conn
        self.cache = redis_client
 
    async def get_user(self, user_id: str) -> Optional[dict]:
        """Получить пользователя с кешированием"""
        cache_key = f"user:{user_id}"
 
        # 1. Проверяем кеш
        cached = await self.cache.get(cache_key)
        if cached:
            return json.loads(cached)  # Cache hit
 
        # 2. Cache miss → читаем из БД
        user = await self.db.fetchrow(
            "SELECT id, name, email FROM users WHERE id = $1",
            user_id
        )
 
        if not user:
            return None
 
        user_dict = dict(user)
 
        # 3. Сохраняем в кеш с TTL
        await self.cache.setex(
            cache_key,
            300,  # TTL 5 минут
            json.dumps(user_dict)
        )
 
        return user_dict

Тестируем Cache Hit

# tests/test_cache_aside.py
import pytest
import json
 
@pytest.mark.asyncio
async def test_cache_hit(db_connection, redis_client):
    """Второй запрос возвращает данные из кеша"""
    # Подготовка БД
    await db_connection.execute(
        "INSERT INTO users (id, name, email) VALUES ($1, $2, $3)",
        "user-1", "Alice", "alice@test.com"
    )
 
    repo = CachedRepository(db_connection, redis_client)
 
    # Первый запрос → cache miss, читаем из БД
    user1 = await repo.get_user("user-1")
    assert user1["name"] == "Alice"
 
    # Удаляем из БД
    await db_connection.execute("DELETE FROM users WHERE id = $1", "user-1")
 
    # Второй запрос → cache hit, несмотря на удаление из БД
    user2 = await repo.get_user("user-1")
    assert user2 is not None  # ✅ Данные из кеша!
    assert user2["name"] == "Alice"

Результат: ✅ PASS — второй запрос получил данные из кеша, не из БД.

Тестируем TTL (Time To Live)

@pytest.mark.asyncio
async def test_cache_ttl(db_connection, redis_client):
    """Кеш протухает через TTL"""
    await db_connection.execute(
        "INSERT INTO users (id, name) VALUES ($1, $2)",
        "user-2", "Bob"
    )
 
    repo = CachedRepository(db_connection, redis_client)
 
    # Первый запрос → кеширование
    await repo.get_user("user-2")
 
    # Проверяем что кеш есть
    cached = await redis_client.get("user:user-2")
    assert cached is not None
 
    # Проверяем TTL
    ttl = await redis_client.ttl("user:user-2")
    assert 290 <= ttl <= 300  # TTL примерно 5 минут

Cache Invalidation

Золотое правило: при изменении данных в БД → инвалидируем кеш.

class CachedRepository:
    # ... (предыдущий код)
 
    async def update_user(self, user_id: str, name: str):
        """Обновить пользователя + инвалидация кеша"""
        # 1. Обновляем БД
        await self.db.execute(
            "UPDATE users SET name = $1 WHERE id = $2",
            name, user_id
        )
 
        # 2. Инвалидируем кеш
        cache_key = f"user:{user_id}"
        await self.cache.delete(cache_key)

Тест инвалидации:

@pytest.mark.asyncio
async def test_cache_invalidation(db_connection, redis_client):
    """После UPDATE кеш инвалидируется"""
    await db_connection.execute(
        "INSERT INTO users (id, name) VALUES ($1, $2)",
        "user-3", "Charlie"
    )
 
    repo = CachedRepository(db_connection, redis_client)
 
    # Кешируем
    user1 = await repo.get_user("user-3")
    assert user1["name"] == "Charlie"
 
    # Обновляем (инвалидируем кеш)
    await repo.update_user("user-3", "Charlie Updated")
 
    # Следующий запрос → cache miss, читаем из БД
    user2 = await repo.get_user("user-3")
    assert user2["name"] == "Charlie Updated"  # ✅ Новые данные

Полная изоляция: PostgreSQL + Redis

Комбинированная фикстура:

# conftest.py
import pytest
import asyncpg
import redis.asyncio as aioredis
 
@pytest.fixture
async def db_connection():
    """PostgreSQL с rollback"""
    conn = await asyncpg.connect(
        "postgresql://postgres:testpass@localhost:5432/todo_test"
    )
    await conn.execute("BEGIN")
    yield conn
    await conn.execute("ROLLBACK")
    await conn.close()
 
@pytest.fixture
async def redis_client():
    """Redis с flushdb"""
    client = aioredis.from_url(
        "redis://localhost:6379/15",
        decode_responses=True
    )
    yield client
    await client.flushdb()
    await client.close()
 
@pytest.fixture
async def cached_repo(db_connection, redis_client):
    """Репозиторий с кешем"""
    return CachedRepository(db_connection, redis_client)

Тест с обеими фикстурами:

@pytest.mark.asyncio
async def test_full_isolation(cached_repo):
    """PostgreSQL rollback + Redis flushdb"""
    # Создаём пользователя
    await cached_repo.db.execute(
        "INSERT INTO users (id, name) VALUES ($1, $2)",
        "user-4", "Dave"
    )
 
    # Кешируем
    user = await cached_repo.get_user("user-4")
    assert user["name"] == "Dave"
 
    # После теста: PostgreSQL откатится, Redis очистится
    # Следующий тест получит чистые БД и кеш

Практика: счетчик запросов к API

Реальный кейс: rate limiting через Redis.

Реализация:

# rate_limiter.py
class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
 
    async def check_limit(self, user_id: str, limit: int = 10) -> bool:
        """
        Проверить rate limit.
        Возвращает True если можно сделать запрос.
        """
        key = f"rate_limit:{user_id}"
 
        # Инкрементируем счетчик
        count = await self.redis.incr(key)
 
        # Устанавливаем TTL только для первого запроса
        if count == 1:
            await self.redis.expire(key, 60)  # 60 секунд
 
        return count <= limit

Тест:

@pytest.mark.asyncio
async def test_rate_limit(redis_client):
    """10 запросов в минуту"""
    limiter = RateLimiter(redis_client)
 
    # Первые 10 запросов — OK
    for i in range(10):
        allowed = await limiter.check_limit("user-alice")
        assert allowed is True
 
    # 11-й запрос — блокируется
    allowed = await limiter.check_limit("user-alice")
    assert allowed is False  # ✅ Rate limit сработал
 
@pytest.mark.asyncio
async def test_rate_limit_reset(redis_client):
    """Счетчик сбрасывается через TTL"""
    limiter = RateLimiter(redis_client)
 
    # Делаем 10 запросов
    for _ in range(10):
        await limiter.check_limit("user-bob")
 
    # Проверяем TTL
    ttl = await redis_client.ttl("rate_limit:user-bob")
    assert 50 <= ttl <= 60  # TTL примерно 60 секунд
 
    # В production через 60 секунд счетчик сбросится

Типичные ошибки

Ошибка #1: Забыли flushdb

@pytest.fixture
async def redis_client():
    client = aioredis.from_url(...)
    yield client
    await client.close()  # ❌ Нет flushdb!

Проблема: данные из предыдущего теста остаются в Redis.

Исправление:

yield client
await client.flushdb()  # ✅ Чистим
await client.close()

Ошибка #2: Используем DB 0 в тестах

# ❌ DB 0 используется production
client = aioredis.from_url("redis://localhost:6379/0")

Проблема: можете случайно очистить production данные.

Исправление: всегда используйте отдельную DB для тестов (например 15).

Ошибка #3: Не проверяем TTL

await self.cache.set(key, value)  # ❌ Нет TTL!

Проблема: данные никогда не удалятся, Redis переполнится.

Исправление:

await self.cache.setex(key, 300, value)  # ✅ TTL 5 минут

Мокирование Redis (опционально)

Если не хотите запускать реальный Redis:

pip install fakeredis[aioredis]
import pytest
from fakeredis import aioredis as fake_aioredis
 
@pytest.fixture
async def redis_client():
    """Fake Redis для unit-тестов"""
    client = fake_aioredis.FakeRedis(decode_responses=True)
    yield client
    await client.flushdb()
    await client.close()

Когда использовать:

  • ✅ Unit-тесты (быстрые, без Docker)
  • ❌ Integration-тесты (нужен реальный Redis)

Что вы узнали

Redis фикстуры с flushdb для изоляции ✅ Cache Aside pattern — самый популярный паттерн кеширования ✅ TTL и invalidation — управление жизненным циклом кеша ✅ Rate limiting — практический пример использования Redis ✅ Test isolation для PostgreSQL + Redis одновременно

Следующий урок

Теперь вы умеете работать с Redis в тестах. В следующем уроке ловим race conditions в кеше — самые коварные гонки в асинхронных приложениях.

Переходите к уроку 6: Гонки кеша: устаревшие данные в Redis

Async Redis фикстуры и Cache Aside — Pytest: Борьба с Race Conditions в Async-коде — Potapov.me