Гонки кеша: устаревшие данные в Redis
Этот урок объединяет знания из уроков 4 (гонки PostgreSQL) и 5 (Redis фундамент). Теперь ловим race conditions между кешем и БД — самые коварные гонки в асинхронных приложениях.
Главный фокус — cache invalidation: научиться тестировать race conditions между Redis и PostgreSQL, и выбрать правильную стратегию кеширования для production.
Production Issue #2847
Репорт от Support:
"Customers report seeing outdated prices during Black Friday sale. Example: product page shows $99 (cached), but checkout shows $79 (actual DB price). 234 complaints in 2 hours, customers confused, some abandoned carts. Estimated revenue loss: $15K+."
Симптомы:
- Cache hit rate: 95% (отлично!)
- Но: 5-10% пользователей видят stale data
- Проблема усиливается при высокой нагрузке
- В логах: "price mismatch" между cache и DB
Root cause: Race condition при обновлении данных:
- Thread A читает product из Redis (old price: $99)
- Thread B обновляет product в PostgreSQL (new price: $79)
- Thread B забывает инвалидировать cache
- Thread A продолжает видеть old price из cache
Стоимость инцидента (Black Friday):
- Direct revenue loss: $15,000 (abandoned carts)
- Refunds для confused customers: $8,000
- Support time: 40 hours × $40/h = $1,600
- Engineering investigation: 12 hours × $100/h = $1,200
- Total impact: $25,800 за 2 часа
- Extrapolated annual cost: если проблема повторяется 1 раз/месяц = $300K/year
Что такое cache race condition
Cache race condition — ситуация когда кеш и источник данных рассинхронизированы из-за конкурентного доступа.
Основные типы cache races:
-
Stale cache (устаревший кеш):
- DB обновлена, cache НЕ инвалидирован
- Readers видят старые данные
-
Cache stampede (лавина промахов):
- Cache expires, 1000 requests одновременно идут в DB
- DB overload
-
Write-write conflict:
- Два writer'а обновляют cache/DB в разном порядке
- Inconsistent state
-
Thundering herd:
- Множественные writer'ы пытаются заполнить пустой cache
- Race на запись
В этом уроке фокусируемся на #1 (stale cache) — самая частая и коварная проблема.
Подготовка
Продолжаем проект из урока 01. У нас уже есть PostgreSQL + Redis + async fixtures с транзакциями.
Расширяем схему БД для products:
-- schema.sql (добавляем к существующей)
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
product_id VARCHAR(36) UNIQUE NOT NULL,
name VARCHAR(200) NOT NULL,
price INTEGER NOT NULL, -- В центах для точности
stock INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_products_product_id ON products(product_id);Применяем:
docker exec -i pytest-postgres psql -U postgres -d todo_test < schema.sqlЗависимости уже установлены (из урока 01):
pip install asyncpg aioredis pytest-asyncioШаг 1. Воспроизводим stale cache
Создайте tests/integration/test_cache_race_naive.py. Воспроизводим реальную проблему: читатель видит старые данные после обновления.
import asyncio
import asyncpg
import pytest
import json
class NaiveProductService:
"""
ПЛОХАЯ реализация: нет синхронизации между cache и DB!
Проблема: update в DB не инвалидирует cache → stale reads.
"""
def __init__(self, db_conn, redis):
self.db = db_conn
self.redis = redis
async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
"""Создаёт продукт в DB и кеше"""
await self.db.execute(
"INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
product_id, name, price, stock
)
product = {
"product_id": product_id,
"name": name,
"price": price,
"stock": stock
}
# Кешируем на 60 секунд
await self.redis.setex(
f"product:{product_id}",
60,
json.dumps(product)
)
return product
async def get_product(self, product_id: str) -> dict:
"""
Читает продукт: сначала cache, потом DB.
ПРОБЛЕМА: если cache hit, возвращаем не проверяя актуальность!
"""
# 1. Пытаемся из cache
cached = await self.redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
# 2. Cache miss → идём в DB
row = await self.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
product_id
)
if not row:
raise KeyError(f"Product {product_id} not found")
product = dict(row)
# 3. Сохраняем в cache
await self.redis.setex(
f"product:{product_id}",
60,
json.dumps(product)
)
return product
async def update_product_price(self, product_id: str, new_price: int) -> None:
"""
ОПАСНАЯ РЕАЛИЗАЦИЯ: обновляет DB, но НЕ инвалидирует cache!
Это и есть источник stale cache race condition.
"""
await self.db.execute(
"UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
new_price, product_id
)
# ❌ БАГ: НЕ инвалидируем cache!
# Старые данные остаются в Redis до expiry (60 секунд)
# ФИКСТУРЫ ИЗ УРОКА 01
@pytest.fixture(scope="session")
async def db_pool():
"""Session-scoped connection pool"""
pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="testpass",
database="todo_test",
min_size=2,
max_size=10
)
yield pool
await pool.close()
@pytest.fixture
async def db_connection(db_pool):
"""Function-scoped connection с транзакцией"""
async with db_pool.acquire() as conn:
tx = conn.transaction()
await tx.start()
yield conn
await tx.rollback()
@pytest.fixture
async def redis_client():
"""Redis client с cleanup"""
import aioredis
redis = await aioredis.from_url(
"redis://localhost:6379",
decode_responses=True
)
yield redis
await redis.flushdb()
await redis.close()
@pytest.fixture
async def naive_product_service(db_connection, redis_client):
"""Product service с наивной (плохой) реализацией"""
return NaiveProductService(db_connection, redis_client)
# ТЕСТЫ КОТОРЫЕ ДЕМОНСТРИРУЮТ STALE CACHE RACE
@pytest.mark.asyncio
@pytest.mark.integration
async def test_stale_cache_after_price_update(naive_product_service):
"""
Тест #1: простейший сценарий stale cache.
1. Создаём продукт с price=$100
2. Читаем из cache → $100 ✓
3. Обновляем price в DB → $50
4. Читаем снова → всё ещё $100 ❌ (stale cache!)
"""
service = naive_product_service
# 1. Создаём продукт
await service.create_product(
product_id="laptop-x1",
name="Gaming Laptop",
price=10000, # $100.00 в центах
stock=50
)
# 2. Первое чтение (cache hit после create)
product = await service.get_product("laptop-x1")
assert product["price"] == 10000
# 3. Admin обновляет цену в DB
await service.update_product_price("laptop-x1", 5000) # $50.00
# 4. User читает продукт снова
product = await service.get_product("laptop-x1")
# ❌ БАГ: видим СТАРУЮ цену из cache!
# Expected: 5000 (новая цена из DB)
# Actual: 10000 (старая цена из cache)
assert product["price"] == 5000, (
f"Stale cache detected! Expected price=$50 (DB), "
f"but got price=${product['price']/100} (cached)"
)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_read_during_write(naive_product_service):
"""
Тест #2: race condition — reader и writer одновременно.
Timeline:
- t=0: Reader starts (cache miss, reads DB)
- t=0: Writer updates DB + cache
- t=1: Reader finishes (writes OLD value to cache!)
- Result: stale cache победил
"""
service = naive_product_service
# Создаём продукт
await service.create_product("phone-z1", "Flagship Phone", 80000, 100)
# Очищаем cache чтобы создать cache miss
await service.redis.delete("product:phone-z1")
async def slow_reader():
"""Reader с задержкой (имитирует медленный DB query)"""
# Читаем из DB (cache miss)
row = await service.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
"phone-z1"
)
product = dict(row)
# ЗАДЕРЖКА: тут writer успевает обновить price!
await asyncio.sleep(0.05)
# Пишем СТАРЫЕ данные в cache
await service.redis.setex(
f"product:{product['product_id']}",
60,
json.dumps(product)
)
return product
async def fast_writer():
"""Writer обновляет price"""
await asyncio.sleep(0.01) # Даём reader начать
await service.update_product_price("phone-z1", 60000) # $600
# Запускаем параллельно
reader_result, _ = await asyncio.gather(
slow_reader(),
fast_writer()
)
# Reader вернул старую цену (прочитал до update)
assert reader_result["price"] == 80000
# НО! Проверяем что в cache сейчас
cached = await service.get_product("phone-z1")
# ❌ БАГ: в cache СТАРАЯ цена, хотя в DB новая!
# Это классическая race condition
assert cached["price"] == 60000, (
f"Race condition! Writer updated to $600, "
f"but cache has stale ${cached['price']/100} from slow reader"
)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_multiple_concurrent_updates(naive_product_service):
"""
Тест #3: множественные обновления → непредсказуемое состояние cache.
3 writer'а одновременно обновляют price.
Результат: last write wins, но в cache может быть любой из трёх!
"""
service = naive_product_service
await service.create_product("monitor-4k", "4K Monitor", 50000, 20)
async def update_price(new_price: int):
await service.update_product_price("monitor-4k", new_price)
await asyncio.sleep(0.01) # Даём времени перемешаться
# 3 параллельных обновления
await asyncio.gather(
update_price(40000), # $400
update_price(45000), # $450
update_price(42000), # $420
)
# В DB: последний writer (один из трёх)
db_row = await service.db.fetchrow(
"SELECT price FROM products WHERE product_id = $1",
"monitor-4k"
)
db_price = db_row["price"]
# В cache: может быть ЛЮБОЙ из трёх (или даже original $500!)
cached = await service.get_product("monitor-4k")
cached_price = cached["price"]
# ❌ БАГ: cache и DB рассинхронизированы
assert db_price == cached_price, (
f"Cache inconsistency! DB has ${db_price/100}, "
f"but cache has ${cached_price/100}"
)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_real_world_checkout_scenario(naive_product_service):
"""
Тест #4: реальный сценарий Black Friday.
User flow:
1. View product page (reads from cache: $99)
2. Admin applies discount (updates DB: $79)
3. User adds to cart (still sees cached $99)
4. User proceeds to checkout (reads fresh from DB: $79)
5. User confused: "Price changed in my cart!"
"""
service = naive_product_service
# Black Friday product
await service.create_product(
"headphones-pro",
"Pro Headphones",
9900, # $99
500
)
# 1. User views product page
product_view = await service.get_product("headphones-pro")
assert product_view["price"] == 9900
user_saw_price = product_view["price"]
# 2. Admin applies Black Friday discount (20% off)
await service.update_product_price("headphones-pro", 7900) # $79
# 3. User adds to cart (читает из cache — stale!)
product_in_cart = await service.get_product("headphones-pro")
cart_price = product_in_cart["price"]
# 4. User proceeds to checkout (fresh read from DB)
# Имитируем: checkout service ВСЕГДА читает из DB для точности
db_row = await service.db.fetchrow(
"SELECT price FROM products WHERE product_id = $1",
"headphones-pro"
)
checkout_price = db_row["price"]
# ❌ ПРОБЛЕМА: цены не совпадают!
# User видел $99, в корзине $99, но checkout показывает $79
price_mismatch = cart_price != checkout_price
if price_mismatch:
print(f"⚠️ Price mismatch detected!")
print(f" User saw: ${user_saw_price/100}")
print(f" Cart price: ${cart_price/100}")
print(f" Checkout price: ${checkout_price/100}")
print(f" → Customer confused, potential cart abandonment")
# Это реальная проблема из Production Issue #2847
assert not price_mismatch, (
f"Black Friday incident reproduced! "
f"Cart shows ${cart_price/100}, but checkout is ${checkout_price/100}"
)Запуск тестов:
# Все тесты должны упасть с stale cache errors
pytest tests/integration/test_cache_race_naive.py -v -s
# Ожидаемый результат: 4/4 FAILEDОжидаемые ошибки:
FAILED test_stale_cache_after_price_update - AssertionError:
Stale cache detected! Expected price=$50 (DB), but got price=$100 (cached)
FAILED test_concurrent_read_during_write - AssertionError:
Race condition! Writer updated to $600, but cache has stale $800
FAILED test_multiple_concurrent_updates - AssertionError:
Cache inconsistency! DB has $420, but cache has $500
FAILED test_real_world_checkout_scenario - AssertionError:
Black Friday incident reproduced! Cart shows $99, but checkout is $79Шаг 2. Диагностика: видим cache race в действии
Добавим observability чтобы понять КАК именно происходит рассинхронизация.
# tests/integration/test_cache_race_debug.py
import asyncio
import asyncpg
import pytest
import json
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ObservableProductService:
"""
Product service с полным логированием всех cache/DB операций.
"""
def __init__(self, db_conn, redis):
self.db = db_conn
self.redis = redis
self.operation_log = [] # История всех операций для анализа
def _log_operation(self, operation: str, **kwargs):
"""Логируем каждую операцию с timestamp"""
entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
**kwargs
}
self.operation_log.append(entry)
logger.info(f"[{operation}] {kwargs}")
async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
self._log_operation("CREATE_START", product_id=product_id, price=price)
await self.db.execute(
"INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
product_id, name, price, stock
)
self._log_operation("DB_INSERT", product_id=product_id, price=price)
product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
self._log_operation("CACHE_SET", product_id=product_id, price=price, ttl=60)
return product
async def get_product(self, product_id: str) -> dict:
self._log_operation("GET_START", product_id=product_id)
# Проверяем cache
cached = await self.redis.get(f"product:{product_id}")
if cached:
product = json.loads(cached)
self._log_operation("CACHE_HIT", product_id=product_id, price=product["price"])
return product
self._log_operation("CACHE_MISS", product_id=product_id)
# Читаем из DB
row = await self.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
product_id
)
if not row:
raise KeyError(f"Product {product_id} not found")
product = dict(row)
self._log_operation("DB_READ", product_id=product_id, price=product["price"])
# Пишем в cache
await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
self._log_operation("CACHE_SET", product_id=product_id, price=product["price"], ttl=60)
return product
async def update_product_price(self, product_id: str, new_price: int) -> None:
self._log_operation("UPDATE_START", product_id=product_id, new_price=new_price)
await self.db.execute(
"UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
new_price, product_id
)
self._log_operation("DB_UPDATE", product_id=product_id, new_price=new_price)
# ❌ БАГ: не инвалидируем cache!
def print_operation_timeline(self):
"""Выводит timeline всех операций для debugging"""
print("\n" + "="*80)
print("OPERATION TIMELINE (showing cache race)")
print("="*80)
for i, op in enumerate(self.operation_log):
timestamp = op["timestamp"].split("T")[1][:12] # HH:MM:SS.mmm
operation = op["operation"]
details = {k: v for k, v in op.items() if k not in ["timestamp", "operation"]}
print(f"{i+1:2d}. [{timestamp}] {operation:20s} {details}")
print("="*80 + "\n")
@pytest.fixture
async def observable_product_service(db_connection, redis_client):
"""Service с логированием"""
return ObservableProductService(db_connection, redis_client)
@pytest.mark.asyncio
async def test_visualize_stale_cache_race(observable_product_service):
"""
Debug тест: визуализируем КАК происходит stale cache.
"""
service = observable_product_service
# Создаём продукт
await service.create_product("debug-product", "Test Product", 10000, 100)
# Читаем (cache hit)
await service.get_product("debug-product")
# Обновляем price
await service.update_product_price("debug-product", 5000)
# Читаем снова (stale cache hit!)
product = await service.get_product("debug-product")
# Выводим timeline
service.print_operation_timeline()
# Показываем проблему
print(f"\n⚠️ STALE CACHE DETECTED:")
print(f" DB price: $50.00 (updated)")
print(f" Cache price: ${product['price']/100:.2f} (stale!)")
print(f" Cache should have been invalidated after UPDATE_START\n")
@pytest.mark.asyncio
async def test_visualize_concurrent_race(observable_product_service):
"""
Debug тест: визуализируем race между reader и writer.
"""
service = observable_product_service
await service.create_product("race-product", "Race Product", 10000, 50)
await service.redis.delete("product:race-product") # Force cache miss
async def reader():
await asyncio.sleep(0.001)
return await service.get_product("race-product")
async def writer():
await asyncio.sleep(0.002)
await service.update_product_price("race-product", 8000)
# Запускаем параллельно
await asyncio.gather(reader(), writer())
# Timeline покажет порядок операций
service.print_operation_timeline()
print(f"\n📊 ANALYSIS:")
print(f" Look at the timeline above:")
print(f" - CACHE_MISS → DB_READ → (writer UPDATE) → CACHE_SET")
print(f" - Reader cached OLD value AFTER writer updated DB")
print(f" - This is a classic race condition\n")Запуск с выводом timeline:
pytest tests/integration/test_cache_race_debug.py::test_visualize_stale_cache_race -v -sЧто вы увидите:
================================================================================
OPERATION TIMELINE (showing cache race)
================================================================================
1. [10:15:23.001] CREATE_START {'product_id': 'debug-product', 'price': 10000}
2. [10:15:23.012] DB_INSERT {'product_id': 'debug-product', 'price': 10000}
3. [10:15:23.015] CACHE_SET {'product_id': 'debug-product', 'price': 10000, 'ttl': 60}
4. [10:15:23.020] GET_START {'product_id': 'debug-product'}
5. [10:15:23.022] CACHE_HIT {'product_id': 'debug-product', 'price': 10000}
6. [10:15:23.025] UPDATE_START {'product_id': 'debug-product', 'new_price': 5000}
7. [10:15:23.030] DB_UPDATE {'product_id': 'debug-product', 'new_price': 5000}
8. [10:15:23.035] GET_START {'product_id': 'debug-product'}
9. [10:15:23.037] CACHE_HIT {'product_id': 'debug-product', 'price': 10000} ← STALE!
================================================================================
⚠️ STALE CACHE DETECTED:
DB price: $50.00 (updated)
Cache price: $100.00 (stale!)
Cache should have been invalidated after UPDATE_STARTПроблема очевидна: После DB_UPDATE (строка 7) мы не инвалидировали cache, поэтому следующий GET_START (строка 8) получил CACHE_HIT со старыми данными.
Шаг 3. Фикс #1 — Cache Aside (manual invalidation)
Самое простое решение: явно удаляем cache после каждого update.
# tests/integration/test_cache_aside_pattern.py
import asyncio
import asyncpg
import pytest
import json
class ProductServiceCacheAside:
"""
Cache Aside pattern: читаем из cache, пишем в DB + инвалидируем cache.
Strategy:
- Read: cache → DB (if miss) → populate cache
- Write: DB → invalidate cache
- Next read: cache miss → fresh from DB → populate
"""
def __init__(self, db_conn, redis):
self.db = db_conn
self.redis = redis
async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
await self.db.execute(
"INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
product_id, name, price, stock
)
product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
# Кешируем после create
await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
return product
async def get_product(self, product_id: str) -> dict:
# 1. Try cache
cached = await self.redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
# 2. Cache miss → DB
row = await self.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
product_id
)
if not row:
raise KeyError(f"Product {product_id} not found")
product = dict(row)
# 3. Populate cache
await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
return product
async def update_product_price(self, product_id: str, new_price: int) -> None:
"""
✅ ПРАВИЛЬНАЯ РЕАЛИЗАЦИЯ: обновляем DB + инвалидируем cache.
"""
# 1. Update DB
await self.db.execute(
"UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
new_price, product_id
)
# 2. ✅ Инвалидируем cache
await self.redis.delete(f"product:{product_id}")
# Следующий read будет cache miss → fresh data from DB
@pytest.fixture
async def cache_aside_service(db_connection, redis_client):
"""Service с Cache Aside pattern"""
return ProductServiceCacheAside(db_connection, redis_client)
# ТЕСТЫ ПРОВЕРЯЮТ ЧТО CACHE ASIDE РАБОТАЕТ
@pytest.mark.asyncio
@pytest.mark.integration
async def test_cache_aside_no_stale_reads(cache_aside_service):
"""
Тест #1: Cache Aside решает stale cache проблему.
"""
service = cache_aside_service
# Create
await service.create_product("laptop-fixed", "Laptop", 10000, 50)
# Read (cache hit)
product = await service.get_product("laptop-fixed")
assert product["price"] == 10000
# Update
await service.update_product_price("laptop-fixed", 5000)
# Read again (cache miss → fresh from DB)
product = await service.get_product("laptop-fixed")
# ✅ NO stale cache!
assert product["price"] == 5000
@pytest.mark.asyncio
@pytest.mark.integration
async def test_cache_aside_under_concurrent_load(cache_aside_service):
"""
Тест #2: Cache Aside стабилен при параллельных updates.
"""
service = cache_aside_service
await service.create_product("monitor-fixed", "Monitor", 50000, 20)
# 10 параллельных updates
prices = [45000, 46000, 47000, 48000, 49000, 50000, 51000, 52000, 53000, 54000]
await asyncio.gather(*[
service.update_product_price("monitor-fixed", price)
for price in prices
])
# Читаем финальный результат
product = await service.get_product("monitor-fixed")
# Проверяем что cache и DB синхронизированы
db_row = await service.db.fetchrow(
"SELECT price FROM products WHERE product_id = $1",
"monitor-fixed"
)
# ✅ Cache = DB (нет рассинхронизации)
assert product["price"] == db_row["price"]
@pytest.mark.asyncio
@pytest.mark.integration
async def test_cache_aside_black_friday_scenario(cache_aside_service):
"""
Тест #3: Black Friday сценарий теперь работает корректно.
"""
service = cache_aside_service
# Setup
await service.create_product("headphones-fixed", "Headphones", 9900, 500)
# User views (cache hit)
product_view = await service.get_product("headphones-fixed")
assert product_view["price"] == 9900
# Admin applies discount
await service.update_product_price("headphones-fixed", 7900)
# User adds to cart (cache miss → fresh price!)
product_in_cart = await service.get_product("headphones-fixed")
cart_price = product_in_cart["price"]
# Checkout
db_row = await service.db.fetchrow(
"SELECT price FROM products WHERE product_id = $1",
"headphones-fixed"
)
checkout_price = db_row["price"]
# ✅ NO price mismatch!
assert cart_price == checkout_price == 7900Запуск фикс-тестов:
pytest tests/integration/test_cache_aside_pattern.py -v
# ✅ PASS (3/3) — все тесты зелёные!Почему Cache Aside работает:
- Update → DB write + cache invalidation (atomic в рамках функции)
- Next read → cache miss (потому что delete)
- Fetch from DB → получаем свежие данные
- Repopulate cache → кешируем актуальные данные
Плюсы:
- ✅ Простая реализация
- ✅ Всегда актуальные данные
- ✅ Работает при любой нагрузке
Минусы:
- ❌ Каждый update → cache miss → DB query (медленнее)
- ❌ Cache stampede risk (если много readers после invalidation)
Шаг 4. Фикс #2 — Write Through (atomic write)
Write Through pattern: пишем в DB и cache атомарно (вместо delete).
# tests/integration/test_write_through_pattern.py
import asyncio
import asyncpg
import pytest
import json
class ProductServiceWriteThrough:
"""
Write Through pattern: пишем в DB и cache одновременно.
Strategy:
- Read: cache → DB (if miss) → populate cache
- Write: DB → update cache (не delete!)
- Cache всегда актуален (strong consistency)
"""
def __init__(self, db_conn, redis):
self.db = db_conn
self.redis = redis
async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
await self.db.execute(
"INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
product_id, name, price, stock
)
product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
await self.redis.setex(f"product:{product_id}", 300, json.dumps(product))
return product
async def get_product(self, product_id: str) -> dict:
cached = await self.redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
row = await self.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
product_id
)
if not row:
raise KeyError(f"Product {product_id} not found")
product = dict(row)
await self.redis.setex(f"product:{product_id}", 300, json.dumps(product))
return product
async def update_product_price(self, product_id: str, new_price: int) -> None:
"""
✅ Write Through: обновляем DB и cache АТОМАРНО.
"""
# 1. Update DB
await self.db.execute(
"UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
new_price, product_id
)
# 2. Читаем полный объект из DB (для cache)
row = await self.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
product_id
)
if row:
product = dict(row)
# 3. ✅ Обновляем cache (не delete!)
await self.redis.setex(f"product:{product_id}", 300, json.dumps(product))
@pytest.fixture
async def write_through_service(db_connection, redis_client):
"""Service с Write Through pattern"""
return ProductServiceWriteThrough(db_connection, redis_client)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_write_through_no_cache_miss(write_through_service):
"""
Тест #1: Write Through избегает cache miss после update.
"""
service = write_through_service
await service.create_product("keyboard-wt", "Keyboard", 15000, 100)
# Update
await service.update_product_price("keyboard-wt", 12000)
# Проверяем что в cache УЖЕ новая цена (без DB query!)
cached = await service.redis.get("product:keyboard-wt")
cached_product = json.loads(cached)
# ✅ Cache обновлён без miss
assert cached_product["price"] == 12000
@pytest.mark.asyncio
@pytest.mark.integration
async def test_write_through_performance(write_through_service):
"""
Тест #2: Write Through быстрее (нет cache miss).
Сравниваем: Cache Aside vs Write Through.
"""
service = write_through_service
await service.create_product("mouse-wt", "Mouse", 5000, 200)
import time
# 100 updates + reads
start = time.time()
for i in range(100):
await service.update_product_price("mouse-wt", 5000 + i)
await service.get_product("mouse-wt") # ← Cache hit (fast!)
duration = time.time() - start
print(f"\n⚡ Write Through: 100 update+read cycles in {duration:.3f}s")
print(f" Average: {duration/100*1000:.1f}ms per cycle")
# Write Through должен быть < 3s (cache hits)
assert duration < 3.0, f"Too slow: {duration:.3f}s"Запуск:
pytest tests/integration/test_write_through_pattern.py -v -sРезультат:
⚡ Write Through: 100 update+read cycles in 0.845s
Average: 8.5ms per cycle
✅ PASS (2/2)Write Through плюсы:
- ✅ Нет cache miss после update (быстрее)
- ✅ Cache всегда актуален
- ✅ Меньше нагрузка на DB (меньше reads)
Write Through минусы:
- ❌ Сложнее реализация (нужно читать full object из DB)
- ❌ Update медленнее (DB write + cache write)
- ❌ Если cache недоступен → write fail (нужен fallback)
Шаг 5. Фикс #3 — TTL-based (eventual consistency)
TTL-based pattern: не инвалидируем cache, полагаемся на автоматический expiry.
# tests/integration/test_ttl_based_pattern.py
import asyncio
import asyncpg
import pytest
import json
class ProductServiceTTL:
"""
TTL-based pattern: eventual consistency через короткий TTL.
Strategy:
- Read: cache → DB (if miss/expired) → populate with short TTL
- Write: DB only (не трогаем cache!)
- Cache expires через 5-10 секунд → auto-refresh
"""
def __init__(self, db_conn, redis, cache_ttl=5):
self.db = db_conn
self.redis = redis
self.cache_ttl = cache_ttl # Короткий TTL для fast expiry
async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
await self.db.execute(
"INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
product_id, name, price, stock
)
product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
# Короткий TTL
await self.redis.setex(f"product:{product_id}", self.cache_ttl, json.dumps(product))
return product
async def get_product(self, product_id: str) -> dict:
cached = await self.redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
row = await self.db.fetchrow(
"SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
product_id
)
if not row:
raise KeyError(f"Product {product_id} not found")
product = dict(row)
# Короткий TTL для быстрого обновления
await self.redis.setex(f"product:{product_id}", self.cache_ttl, json.dumps(product))
return product
async def update_product_price(self, product_id: str, new_price: int) -> None:
"""
✅ TTL-based: обновляем только DB, НЕ трогаем cache.
Cache обновится автоматически через TTL expiry.
"""
await self.db.execute(
"UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
new_price, product_id
)
# НЕ инвалидируем и НЕ обновляем cache!
# Полагаемся на TTL expiry (5 секунд)
@pytest.fixture
async def ttl_service(db_connection, redis_client):
"""Service с TTL-based pattern (1 sec TTL для тестов)"""
return ProductServiceTTL(db_connection, redis_client, cache_ttl=1)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_ttl_eventual_consistency(ttl_service):
"""
Тест #1: TTL pattern → eventual consistency.
Timeline:
- t=0: create product (price=$100)
- t=0.1: update price to $50
- t=0.2: read → stale $100 (cache not expired yet)
- t=1.1: read → fresh $50 (cache expired, read from DB)
"""
service = ttl_service
# Create
await service.create_product("webcam-ttl", "Webcam", 10000, 50)
# Update immediately
await service.update_product_price("webcam-ttl", 5000)
# Read сразу после update (cache ещё не expired)
product_stale = await service.get_product("webcam-ttl")
# ⚠️ Может быть stale (зависит от timing)
is_stale = product_stale["price"] == 10000
is_fresh = product_stale["price"] == 5000
if is_stale:
print(f"\n⏱️ Stale read detected (expected with TTL pattern)")
# Ждём expiry
await asyncio.sleep(1.5)
# Теперь cache expired → fresh from DB
product_fresh = await service.get_product("webcam-ttl")
assert product_fresh["price"] == 5000
print(f"✅ After TTL expiry: fresh data from DB")
else:
print(f"\n✅ Got fresh data immediately (cache already expired)")
@pytest.mark.asyncio
@pytest.mark.integration
async def test_ttl_acceptable_staleness_window(ttl_service):
"""
Тест #2: TTL pattern приемлем если staleness window короткий.
Use case: показываем "примерную" цену, не критично если отстаёт на 1-5 сек.
"""
service = ttl_service
await service.create_product("charger-ttl", "Charger", 3000, 300)
# Множественные updates
for new_price in range(3000, 3100, 10):
await service.update_product_price("charger-ttl", new_price)
# Читаем сразу (может быть устаревшая цена)
product = await service.get_product("charger-ttl")
# Допускаем staleness до 1 секунды
staleness_acceptable = True # Для product listings это OK
if staleness_acceptable:
print(f"\n📊 TTL pattern suitable for:")
print(f" - Product listings (approximate prices)")
print(f" - Analytics dashboards (near-real-time)")
print(f" - Non-critical data (news feeds, recommendations)")
print(f" ")
print(f" NOT suitable for:")
print(f" - Payments (need exact price)")
print(f" - Inventory (can't sell out-of-stock items)")
print(f" - User balances (need accuracy)")Trade-offs таблица:
| Strategy | Consistency | Performance | Complexity | Use Cases |
|---|---|---|---|---|
| Cache Aside | ✅ Strong | 🟡 Medium (cache miss) | 🟢 Simple | Критичные данные (payments, inventory) |
| Write Through | ✅ Strong | ✅ High (no miss) | 🟡 Medium | Часто читаемые, редко обновляемые |
| TTL-based | 🟡 Eventual | ✅ High | 🟢 Simple | Некритичные данные (listings, analytics) |
Шаг 6. Production patterns
Distributed cache invalidation
Проблема: Несколько серверов используют cache, как синхронизировать invalidation?
Решение: Redis Pub/Sub
class ProductServiceWithPubSub:
"""
Distributed cache invalidation через Redis Pub/Sub.
Когда один сервер обновляет продукт:
1. Update DB
2. Delete local cache
3. Publish "product:invalidated:{product_id}" в Redis
4. Другие серверы получают сообщение → инвалидируют свой cache
"""
def __init__(self, db_conn, redis):
self.db = db_conn
self.redis = redis
self.local_cache = {} # In-memory cache для примера
async def start_listening(self):
"""Subscribe на invalidation events"""
pubsub = self.redis.pubsub()
await pubsub.subscribe("product:invalidations")
async for message in pubsub.listen():
if message["type"] == "message":
product_id = message["data"]
# Инвалидируем local cache
self.local_cache.pop(product_id, None)
async def update_product_price(self, product_id: str, new_price: int) -> None:
# Update DB
await self.db.execute(
"UPDATE products SET price = $1 WHERE product_id = $2",
new_price, product_id
)
# Invalidate local cache
self.local_cache.pop(product_id, None)
# Invalidate Redis cache
await self.redis.delete(f"product:{product_id}")
# ✅ Notify other servers
await self.redis.publish("product:invalidations", product_id)Monitoring & Metrics
from prometheus_client import Counter, Histogram, Gauge
# Cache metrics
cache_hits = Counter("cache_hits_total", "Cache hits", ["cache_key_prefix"])
cache_misses = Counter("cache_misses_total", "Cache misses", ["cache_key_prefix"])
cache_ttl_expired = Counter("cache_ttl_expired_total", "Cache entries expired due to TTL")
stale_reads = Counter("cache_stale_reads_total", "Stale cache reads detected")
cache_get_duration = Histogram(
"cache_get_duration_seconds",
"Time spent getting from cache",
["status"] # hit or miss
)
cache_size = Gauge("cache_size_bytes", "Current cache size")
class MonitoredProductService:
"""Product service с полной observability"""
async def get_product(self, product_id: str) -> dict:
import time
start = time.time()
cached = await self.redis.get(f"product:{product_id}")
if cached:
duration = time.time() - start
cache_hits.labels(cache_key_prefix="product").inc()
cache_get_duration.labels(status="hit").observe(duration)
return json.loads(cached)
duration = time.time() - start
cache_misses.labels(cache_key_prefix="product").inc()
cache_get_duration.labels(status="miss").observe(duration)
# Fetch from DB...
# (остальная логика)Circuit Breaker для cache
class CacheWithCircuitBreaker:
"""
Circuit breaker: если Redis падает, graceful degradation.
"""
def __init__(self, redis, failure_threshold=5, timeout=60):
self.redis = redis
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.state = "closed" # closed | open | half-open
self.last_failure_time = None
async def get(self, key: str) -> str | None:
if self.state == "open":
# Circuit open → skip cache, go directly to DB
return None
try:
value = await self.redis.get(key)
self._on_success()
return value
except Exception as e:
self._on_failure()
# Graceful: return None (cache miss) вместо exception
return None
def _on_success(self):
self.failure_count = 0
if self.state == "half-open":
self.state = "closed"
def _on_failure(self):
import time
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = "open"
self.last_failure_time = time.time()📊 Business Impact & ROI
Problem cost (до фикса)
Situation (Black Friday incident):
- Stale cache duration: 60 seconds (original TTL)
- Affected customers: 234 in 2 hours
- Revenue loss: $15,000 (abandoned carts)
- Refunds: $8,000
- Support overhead: $1,600
- Engineering time: $1,200
Single incident cost: $25,800
Annual extrapolation:
- Frequency: 1 incident/month (sales, price updates)
- Annual cost: $309,600
Solution cost
Implementation:
- Choose strategy (Cache Aside): 2 hours
- Implement invalidation logic: 4 hours
- Write tests: 3 hours
- Deploy & validate: 2 hours
- Total: 11h × $100/h = $1,100
ROI
Annual savings:
- Incidents eliminated: $309,600/year
- Investment: $1,100
ROI: 28,054% Break-even: 1.3 дня
Additional benefits:
- ✅ Customer trust restored
- ✅ No more support escalations
- ✅ Faster checkout (correct prices first time)
Production checklist
- Strategy selected: Cache Aside / Write Through / TTL-based
- Tests written: Stale cache scenarios covered
- Monitoring: Cache hit/miss metrics, staleness detection
- Circuit breaker: Graceful degradation if Redis fails
- TTL tuning: Balance freshness vs performance
- Distributed invalidation: Pub/Sub if multi-server
- Load testing: Verify under peak load
- Runbook: What to do when cache inconsistency detected
Что вынести с урока
- Stale cache — реальная проблема, не corner case
- Cache Aside — золотой стандарт для критичных данных
- Write Through — для hot data (часто читаем, редко пишем)
- TTL-based — для некритичных данных (eventual consistency OK)
- Trade-offs matter: Consistency vs Performance vs Complexity
- Monitoring обязателен: Stale reads нужно видеть в метриках
- Circuit breaker для resilience: Cache падает → work without cache
Главное отличие от "учебных примеров":
- ❌ Не игнорируем race conditions ("cache just works")
- ✅ Показываем реальный Black Friday incident
- ✅ Воспроизводим все 3 типа cache races
- ✅ Даём 3 production-ready стратегии с trade-offs
- ✅ Измеряем ROI: $309K annual savings
В следующем уроке: Observability для расследования flaky-тестов — Prometheus метрики, OpenTelemetry трейсы, structured logging.