Async Redis фикстуры и Cache Aside
В этом уроке добавляем Redis в наш tech stack и настраиваем фикстуры для тестирования кеша. Это фундамент перед следующим уроком про гонки кеша.
Главная цель: научиться тестировать приложения с Redis, понять Cache Aside pattern и test isolation для кеша.
Зачем Redis в тестах?
Production реальность:
- 90% современных приложений используют Redis для кеширования
- Кеш вносит свои race conditions и flaky tests
- Нельзя тестировать только PostgreSQL, игнорируя Redis
Наша задача: научиться писать чистые, изолированные тесты с Redis.
Подготовка: запускаем Redis
Docker:
docker run -d --name pytest-redis \
-p 6379:6379 \
redis:7-alpineПроверяем:
docker exec -it pytest-redis redis-cli ping
# Должно вернуть: PONGУстанавливаем клиент:
pip install redis[asyncio]
# или
pip install aioredisRedis фикстура с изоляцией
Проблема test pollution:
Если не чистить Redis между тестами, данные одного теста влияют на другой.
Правильная фикстура:
# conftest.py
import pytest
import redis.asyncio as aioredis
@pytest.fixture
async def redis_client():
"""
Redis клиент с автоматической очисткой после теста.
Используем отдельную БД для тестов.
"""
client = aioredis.from_url(
"redis://localhost:6379/15", # DB 15 для тестов
decode_responses=True
)
yield client
# Чистим БД после теста
await client.flushdb()
await client.close()Почему DB 15?
- Redis имеет 16 БД (0-15)
- Production обычно использует DB 0
- Тесты используют DB 15 → изоляция
Cache Aside Pattern
Самый популярный паттерн кеширования:
- Запрос данных → проверяем кеш
- Если в кеше — возвращаем (cache hit)
- Если нет — читаем из БД, сохраняем в кеш (cache miss)
Реализация:
# cache.py
import json
from typing import Optional
class CachedRepository:
def __init__(self, db_conn, redis_client):
self.db = db_conn
self.cache = redis_client
async def get_user(self, user_id: str) -> Optional[dict]:
"""Получить пользователя с кешированием"""
cache_key = f"user:{user_id}"
# 1. Проверяем кеш
cached = await self.cache.get(cache_key)
if cached:
return json.loads(cached) # Cache hit
# 2. Cache miss → читаем из БД
user = await self.db.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
if not user:
return None
user_dict = dict(user)
# 3. Сохраняем в кеш с TTL
await self.cache.setex(
cache_key,
300, # TTL 5 минут
json.dumps(user_dict)
)
return user_dictТестируем Cache Hit
# tests/test_cache_aside.py
import pytest
import json
@pytest.mark.asyncio
async def test_cache_hit(db_connection, redis_client):
"""Второй запрос возвращает данные из кеша"""
# Подготовка БД
await db_connection.execute(
"INSERT INTO users (id, name, email) VALUES ($1, $2, $3)",
"user-1", "Alice", "alice@test.com"
)
repo = CachedRepository(db_connection, redis_client)
# Первый запрос → cache miss, читаем из БД
user1 = await repo.get_user("user-1")
assert user1["name"] == "Alice"
# Удаляем из БД
await db_connection.execute("DELETE FROM users WHERE id = $1", "user-1")
# Второй запрос → cache hit, несмотря на удаление из БД
user2 = await repo.get_user("user-1")
assert user2 is not None # ✅ Данные из кеша!
assert user2["name"] == "Alice"Результат: ✅ PASS — второй запрос получил данные из кеша, не из БД.
Тестируем TTL (Time To Live)
@pytest.mark.asyncio
async def test_cache_ttl(db_connection, redis_client):
"""Кеш протухает через TTL"""
await db_connection.execute(
"INSERT INTO users (id, name) VALUES ($1, $2)",
"user-2", "Bob"
)
repo = CachedRepository(db_connection, redis_client)
# Первый запрос → кеширование
await repo.get_user("user-2")
# Проверяем что кеш есть
cached = await redis_client.get("user:user-2")
assert cached is not None
# Проверяем TTL
ttl = await redis_client.ttl("user:user-2")
assert 290 <= ttl <= 300 # TTL примерно 5 минутCache Invalidation
Золотое правило: при изменении данных в БД → инвалидируем кеш.
class CachedRepository:
# ... (предыдущий код)
async def update_user(self, user_id: str, name: str):
"""Обновить пользователя + инвалидация кеша"""
# 1. Обновляем БД
await self.db.execute(
"UPDATE users SET name = $1 WHERE id = $2",
name, user_id
)
# 2. Инвалидируем кеш
cache_key = f"user:{user_id}"
await self.cache.delete(cache_key)Тест инвалидации:
@pytest.mark.asyncio
async def test_cache_invalidation(db_connection, redis_client):
"""После UPDATE кеш инвалидируется"""
await db_connection.execute(
"INSERT INTO users (id, name) VALUES ($1, $2)",
"user-3", "Charlie"
)
repo = CachedRepository(db_connection, redis_client)
# Кешируем
user1 = await repo.get_user("user-3")
assert user1["name"] == "Charlie"
# Обновляем (инвалидируем кеш)
await repo.update_user("user-3", "Charlie Updated")
# Следующий запрос → cache miss, читаем из БД
user2 = await repo.get_user("user-3")
assert user2["name"] == "Charlie Updated" # ✅ Новые данныеПолная изоляция: PostgreSQL + Redis
Комбинированная фикстура:
# conftest.py
import pytest
import asyncpg
import redis.asyncio as aioredis
@pytest.fixture
async def db_connection():
"""PostgreSQL с rollback"""
conn = await asyncpg.connect(
"postgresql://postgres:testpass@localhost:5432/todo_test"
)
await conn.execute("BEGIN")
yield conn
await conn.execute("ROLLBACK")
await conn.close()
@pytest.fixture
async def redis_client():
"""Redis с flushdb"""
client = aioredis.from_url(
"redis://localhost:6379/15",
decode_responses=True
)
yield client
await client.flushdb()
await client.close()
@pytest.fixture
async def cached_repo(db_connection, redis_client):
"""Репозиторий с кешем"""
return CachedRepository(db_connection, redis_client)Тест с обеими фикстурами:
@pytest.mark.asyncio
async def test_full_isolation(cached_repo):
"""PostgreSQL rollback + Redis flushdb"""
# Создаём пользователя
await cached_repo.db.execute(
"INSERT INTO users (id, name) VALUES ($1, $2)",
"user-4", "Dave"
)
# Кешируем
user = await cached_repo.get_user("user-4")
assert user["name"] == "Dave"
# После теста: PostgreSQL откатится, Redis очистится
# Следующий тест получит чистые БД и кешПрактика: счетчик запросов к API
Реальный кейс: rate limiting через Redis.
Реализация:
# rate_limiter.py
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
async def check_limit(self, user_id: str, limit: int = 10) -> bool:
"""
Проверить rate limit.
Возвращает True если можно сделать запрос.
"""
key = f"rate_limit:{user_id}"
# Инкрементируем счетчик
count = await self.redis.incr(key)
# Устанавливаем TTL только для первого запроса
if count == 1:
await self.redis.expire(key, 60) # 60 секунд
return count <= limitТест:
@pytest.mark.asyncio
async def test_rate_limit(redis_client):
"""10 запросов в минуту"""
limiter = RateLimiter(redis_client)
# Первые 10 запросов — OK
for i in range(10):
allowed = await limiter.check_limit("user-alice")
assert allowed is True
# 11-й запрос — блокируется
allowed = await limiter.check_limit("user-alice")
assert allowed is False # ✅ Rate limit сработал
@pytest.mark.asyncio
async def test_rate_limit_reset(redis_client):
"""Счетчик сбрасывается через TTL"""
limiter = RateLimiter(redis_client)
# Делаем 10 запросов
for _ in range(10):
await limiter.check_limit("user-bob")
# Проверяем TTL
ttl = await redis_client.ttl("rate_limit:user-bob")
assert 50 <= ttl <= 60 # TTL примерно 60 секунд
# В production через 60 секунд счетчик сброситсяТипичные ошибки
Ошибка #1: Забыли flushdb
@pytest.fixture
async def redis_client():
client = aioredis.from_url(...)
yield client
await client.close() # ❌ Нет flushdb!Проблема: данные из предыдущего теста остаются в Redis.
Исправление:
yield client
await client.flushdb() # ✅ Чистим
await client.close()Ошибка #2: Используем DB 0 в тестах
# ❌ DB 0 используется production
client = aioredis.from_url("redis://localhost:6379/0")Проблема: можете случайно очистить production данные.
Исправление: всегда используйте отдельную DB для тестов (например 15).
Ошибка #3: Не проверяем TTL
await self.cache.set(key, value) # ❌ Нет TTL!Проблема: данные никогда не удалятся, Redis переполнится.
Исправление:
await self.cache.setex(key, 300, value) # ✅ TTL 5 минутМокирование Redis (опционально)
Если не хотите запускать реальный Redis:
pip install fakeredis[aioredis]import pytest
from fakeredis import aioredis as fake_aioredis
@pytest.fixture
async def redis_client():
"""Fake Redis для unit-тестов"""
client = fake_aioredis.FakeRedis(decode_responses=True)
yield client
await client.flushdb()
await client.close()Когда использовать:
- ✅ Unit-тесты (быстрые, без Docker)
- ❌ Integration-тесты (нужен реальный Redis)
Что вы узнали
✅ Redis фикстуры с flushdb для изоляции ✅ Cache Aside pattern — самый популярный паттерн кеширования ✅ TTL и invalidation — управление жизненным циклом кеша ✅ Rate limiting — практический пример использования Redis ✅ Test isolation для PostgreSQL + Redis одновременно
Следующий урок
Теперь вы умеете работать с Redis в тестах. В следующем уроке ловим race conditions в кеше — самые коварные гонки в асинхронных приложениях.
Переходите к уроку 6: Гонки кеша: устаревшие данные в Redis