Перейти к содержимому
К программе курса
Pytest: Борьба с Race Conditions в Async-коде
6 / 967%

Гонки кеша: устаревшие данные в Redis

40 минут

Этот урок объединяет знания из уроков 4 (гонки PostgreSQL) и 5 (Redis фундамент). Теперь ловим race conditions между кешем и БД — самые коварные гонки в асинхронных приложениях.

Главный фокус — cache invalidation: научиться тестировать race conditions между Redis и PostgreSQL, и выбрать правильную стратегию кеширования для production.

Production Issue #2847

Репорт от Support:

"Customers report seeing outdated prices during Black Friday sale. Example: product page shows $99 (cached), but checkout shows $79 (actual DB price). 234 complaints in 2 hours, customers confused, some abandoned carts. Estimated revenue loss: $15K+."

Симптомы:

  • Cache hit rate: 95% (отлично!)
  • Но: 5-10% пользователей видят stale data
  • Проблема усиливается при высокой нагрузке
  • В логах: "price mismatch" между cache и DB

Root cause: Race condition при обновлении данных:

  1. Thread A читает product из Redis (old price: $99)
  2. Thread B обновляет product в PostgreSQL (new price: $79)
  3. Thread B забывает инвалидировать cache
  4. Thread A продолжает видеть old price из cache

Стоимость инцидента (Black Friday):

  • Direct revenue loss: $15,000 (abandoned carts)
  • Refunds для confused customers: $8,000
  • Support time: 40 hours × $40/h = $1,600
  • Engineering investigation: 12 hours × $100/h = $1,200
  • Total impact: $25,800 за 2 часа
  • Extrapolated annual cost: если проблема повторяется 1 раз/месяц = $300K/year

Что такое cache race condition

Cache race condition — ситуация когда кеш и источник данных рассинхронизированы из-за конкурентного доступа.

Основные типы cache races:

  1. Stale cache (устаревший кеш):

    • DB обновлена, cache НЕ инвалидирован
    • Readers видят старые данные
  2. Cache stampede (лавина промахов):

    • Cache expires, 1000 requests одновременно идут в DB
    • DB overload
  3. Write-write conflict:

    • Два writer'а обновляют cache/DB в разном порядке
    • Inconsistent state
  4. Thundering herd:

    • Множественные writer'ы пытаются заполнить пустой cache
    • Race на запись

В этом уроке фокусируемся на #1 (stale cache) — самая частая и коварная проблема.

Подготовка

Продолжаем проект из урока 01. У нас уже есть PostgreSQL + Redis + async fixtures с транзакциями.

Расширяем схему БД для products:

-- schema.sql (добавляем к существующей)
CREATE TABLE IF NOT EXISTS products (
    id SERIAL PRIMARY KEY,
    product_id VARCHAR(36) UNIQUE NOT NULL,
    name VARCHAR(200) NOT NULL,
    price INTEGER NOT NULL,  -- В центах для точности
    stock INTEGER NOT NULL DEFAULT 0,
    updated_at TIMESTAMP DEFAULT NOW()
);
 
CREATE INDEX idx_products_product_id ON products(product_id);

Применяем:

docker exec -i pytest-postgres psql -U postgres -d todo_test < schema.sql

Зависимости уже установлены (из урока 01):

pip install asyncpg aioredis pytest-asyncio

Шаг 1. Воспроизводим stale cache

Создайте tests/integration/test_cache_race_naive.py. Воспроизводим реальную проблему: читатель видит старые данные после обновления.

import asyncio
import asyncpg
import pytest
import json
 
 
class NaiveProductService:
    """
    ПЛОХАЯ реализация: нет синхронизации между cache и DB!
 
    Проблема: update в DB не инвалидирует cache → stale reads.
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
 
    async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
        """Создаёт продукт в DB и кеше"""
        await self.db.execute(
            "INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
            product_id, name, price, stock
        )
 
        product = {
            "product_id": product_id,
            "name": name,
            "price": price,
            "stock": stock
        }
 
        # Кешируем на 60 секунд
        await self.redis.setex(
            f"product:{product_id}",
            60,
            json.dumps(product)
        )
 
        return product
 
    async def get_product(self, product_id: str) -> dict:
        """
        Читает продукт: сначала cache, потом DB.
 
        ПРОБЛЕМА: если cache hit, возвращаем не проверяя актуальность!
        """
        # 1. Пытаемся из cache
        cached = await self.redis.get(f"product:{product_id}")
        if cached:
            return json.loads(cached)
 
        # 2. Cache miss → идём в DB
        row = await self.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            product_id
        )
 
        if not row:
            raise KeyError(f"Product {product_id} not found")
 
        product = dict(row)
 
        # 3. Сохраняем в cache
        await self.redis.setex(
            f"product:{product_id}",
            60,
            json.dumps(product)
        )
 
        return product
 
    async def update_product_price(self, product_id: str, new_price: int) -> None:
        """
        ОПАСНАЯ РЕАЛИЗАЦИЯ: обновляет DB, но НЕ инвалидирует cache!
 
        Это и есть источник stale cache race condition.
        """
        await self.db.execute(
            "UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
            new_price, product_id
        )
 
        # ❌ БАГ: НЕ инвалидируем cache!
        # Старые данные остаются в Redis до expiry (60 секунд)
 
 
# ФИКСТУРЫ ИЗ УРОКА 01
 
@pytest.fixture(scope="session")
async def db_pool():
    """Session-scoped connection pool"""
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="testpass",
        database="todo_test",
        min_size=2,
        max_size=10
    )
    yield pool
    await pool.close()
 
 
@pytest.fixture
async def db_connection(db_pool):
    """Function-scoped connection с транзакцией"""
    async with db_pool.acquire() as conn:
        tx = conn.transaction()
        await tx.start()
        yield conn
        await tx.rollback()
 
 
@pytest.fixture
async def redis_client():
    """Redis client с cleanup"""
    import aioredis
    redis = await aioredis.from_url(
        "redis://localhost:6379",
        decode_responses=True
    )
    yield redis
    await redis.flushdb()
    await redis.close()
 
 
@pytest.fixture
async def naive_product_service(db_connection, redis_client):
    """Product service с наивной (плохой) реализацией"""
    return NaiveProductService(db_connection, redis_client)
 
 
# ТЕСТЫ КОТОРЫЕ ДЕМОНСТРИРУЮТ STALE CACHE RACE
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_stale_cache_after_price_update(naive_product_service):
    """
    Тест #1: простейший сценарий stale cache.
 
    1. Создаём продукт с price=$100
    2. Читаем из cache → $100 ✓
    3. Обновляем price в DB → $50
    4. Читаем снова → всё ещё $100 ❌ (stale cache!)
    """
    service = naive_product_service
 
    # 1. Создаём продукт
    await service.create_product(
        product_id="laptop-x1",
        name="Gaming Laptop",
        price=10000,  # $100.00 в центах
        stock=50
    )
 
    # 2. Первое чтение (cache hit после create)
    product = await service.get_product("laptop-x1")
    assert product["price"] == 10000
 
    # 3. Admin обновляет цену в DB
    await service.update_product_price("laptop-x1", 5000)  # $50.00
 
    # 4. User читает продукт снова
    product = await service.get_product("laptop-x1")
 
    # ❌ БАГ: видим СТАРУЮ цену из cache!
    # Expected: 5000 (новая цена из DB)
    # Actual: 10000 (старая цена из cache)
    assert product["price"] == 5000, (
        f"Stale cache detected! Expected price=$50 (DB), "
        f"but got price=${product['price']/100} (cached)"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_read_during_write(naive_product_service):
    """
    Тест #2: race condition — reader и writer одновременно.
 
    Timeline:
    - t=0: Reader starts (cache miss, reads DB)
    - t=0: Writer updates DB + cache
    - t=1: Reader finishes (writes OLD value to cache!)
    - Result: stale cache победил
    """
    service = naive_product_service
 
    # Создаём продукт
    await service.create_product("phone-z1", "Flagship Phone", 80000, 100)
 
    # Очищаем cache чтобы создать cache miss
    await service.redis.delete("product:phone-z1")
 
    async def slow_reader():
        """Reader с задержкой (имитирует медленный DB query)"""
        # Читаем из DB (cache miss)
        row = await service.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            "phone-z1"
        )
        product = dict(row)
 
        # ЗАДЕРЖКА: тут writer успевает обновить price!
        await asyncio.sleep(0.05)
 
        # Пишем СТАРЫЕ данные в cache
        await service.redis.setex(
            f"product:{product['product_id']}",
            60,
            json.dumps(product)
        )
        return product
 
    async def fast_writer():
        """Writer обновляет price"""
        await asyncio.sleep(0.01)  # Даём reader начать
        await service.update_product_price("phone-z1", 60000)  # $600
 
    # Запускаем параллельно
    reader_result, _ = await asyncio.gather(
        slow_reader(),
        fast_writer()
    )
 
    # Reader вернул старую цену (прочитал до update)
    assert reader_result["price"] == 80000
 
    # НО! Проверяем что в cache сейчас
    cached = await service.get_product("phone-z1")
 
    # ❌ БАГ: в cache СТАРАЯ цена, хотя в DB новая!
    # Это классическая race condition
    assert cached["price"] == 60000, (
        f"Race condition! Writer updated to $600, "
        f"but cache has stale ${cached['price']/100} from slow reader"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_multiple_concurrent_updates(naive_product_service):
    """
    Тест #3: множественные обновления → непредсказуемое состояние cache.
 
    3 writer'а одновременно обновляют price.
    Результат: last write wins, но в cache может быть любой из трёх!
    """
    service = naive_product_service
 
    await service.create_product("monitor-4k", "4K Monitor", 50000, 20)
 
    async def update_price(new_price: int):
        await service.update_product_price("monitor-4k", new_price)
        await asyncio.sleep(0.01)  # Даём времени перемешаться
 
    # 3 параллельных обновления
    await asyncio.gather(
        update_price(40000),  # $400
        update_price(45000),  # $450
        update_price(42000),  # $420
    )
 
    # В DB: последний writer (один из трёх)
    db_row = await service.db.fetchrow(
        "SELECT price FROM products WHERE product_id = $1",
        "monitor-4k"
    )
    db_price = db_row["price"]
 
    # В cache: может быть ЛЮБОЙ из трёх (или даже original $500!)
    cached = await service.get_product("monitor-4k")
    cached_price = cached["price"]
 
    # ❌ БАГ: cache и DB рассинхронизированы
    assert db_price == cached_price, (
        f"Cache inconsistency! DB has ${db_price/100}, "
        f"but cache has ${cached_price/100}"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_real_world_checkout_scenario(naive_product_service):
    """
    Тест #4: реальный сценарий Black Friday.
 
    User flow:
    1. View product page (reads from cache: $99)
    2. Admin applies discount (updates DB: $79)
    3. User adds to cart (still sees cached $99)
    4. User proceeds to checkout (reads fresh from DB: $79)
    5. User confused: "Price changed in my cart!"
    """
    service = naive_product_service
 
    # Black Friday product
    await service.create_product(
        "headphones-pro",
        "Pro Headphones",
        9900,  # $99
        500
    )
 
    # 1. User views product page
    product_view = await service.get_product("headphones-pro")
    assert product_view["price"] == 9900
    user_saw_price = product_view["price"]
 
    # 2. Admin applies Black Friday discount (20% off)
    await service.update_product_price("headphones-pro", 7900)  # $79
 
    # 3. User adds to cart (читает из cache — stale!)
    product_in_cart = await service.get_product("headphones-pro")
    cart_price = product_in_cart["price"]
 
    # 4. User proceeds to checkout (fresh read from DB)
    # Имитируем: checkout service ВСЕГДА читает из DB для точности
    db_row = await service.db.fetchrow(
        "SELECT price FROM products WHERE product_id = $1",
        "headphones-pro"
    )
    checkout_price = db_row["price"]
 
    # ❌ ПРОБЛЕМА: цены не совпадают!
    # User видел $99, в корзине $99, но checkout показывает $79
    price_mismatch = cart_price != checkout_price
 
    if price_mismatch:
        print(f"⚠️  Price mismatch detected!")
        print(f"   User saw: ${user_saw_price/100}")
        print(f"   Cart price: ${cart_price/100}")
        print(f"   Checkout price: ${checkout_price/100}")
        print(f"   → Customer confused, potential cart abandonment")
 
    # Это реальная проблема из Production Issue #2847
    assert not price_mismatch, (
        f"Black Friday incident reproduced! "
        f"Cart shows ${cart_price/100}, but checkout is ${checkout_price/100}"
    )

Запуск тестов:

# Все тесты должны упасть с stale cache errors
pytest tests/integration/test_cache_race_naive.py -v -s
 
# Ожидаемый результат: 4/4 FAILED

Ожидаемые ошибки:

FAILED test_stale_cache_after_price_update - AssertionError:
    Stale cache detected! Expected price=$50 (DB), but got price=$100 (cached)
 
FAILED test_concurrent_read_during_write - AssertionError:
    Race condition! Writer updated to $600, but cache has stale $800
 
FAILED test_multiple_concurrent_updates - AssertionError:
    Cache inconsistency! DB has $420, but cache has $500
 
FAILED test_real_world_checkout_scenario - AssertionError:
    Black Friday incident reproduced! Cart shows $99, but checkout is $79

Шаг 2. Диагностика: видим cache race в действии

Добавим observability чтобы понять КАК именно происходит рассинхронизация.

# tests/integration/test_cache_race_debug.py
import asyncio
import asyncpg
import pytest
import json
import logging
from datetime import datetime
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
 
class ObservableProductService:
    """
    Product service с полным логированием всех cache/DB операций.
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
        self.operation_log = []  # История всех операций для анализа
 
    def _log_operation(self, operation: str, **kwargs):
        """Логируем каждую операцию с timestamp"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "operation": operation,
            **kwargs
        }
        self.operation_log.append(entry)
        logger.info(f"[{operation}] {kwargs}")
 
    async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
        self._log_operation("CREATE_START", product_id=product_id, price=price)
 
        await self.db.execute(
            "INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
            product_id, name, price, stock
        )
        self._log_operation("DB_INSERT", product_id=product_id, price=price)
 
        product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
 
        await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
        self._log_operation("CACHE_SET", product_id=product_id, price=price, ttl=60)
 
        return product
 
    async def get_product(self, product_id: str) -> dict:
        self._log_operation("GET_START", product_id=product_id)
 
        # Проверяем cache
        cached = await self.redis.get(f"product:{product_id}")
        if cached:
            product = json.loads(cached)
            self._log_operation("CACHE_HIT", product_id=product_id, price=product["price"])
            return product
 
        self._log_operation("CACHE_MISS", product_id=product_id)
 
        # Читаем из DB
        row = await self.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            product_id
        )
 
        if not row:
            raise KeyError(f"Product {product_id} not found")
 
        product = dict(row)
        self._log_operation("DB_READ", product_id=product_id, price=product["price"])
 
        # Пишем в cache
        await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
        self._log_operation("CACHE_SET", product_id=product_id, price=product["price"], ttl=60)
 
        return product
 
    async def update_product_price(self, product_id: str, new_price: int) -> None:
        self._log_operation("UPDATE_START", product_id=product_id, new_price=new_price)
 
        await self.db.execute(
            "UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
            new_price, product_id
        )
        self._log_operation("DB_UPDATE", product_id=product_id, new_price=new_price)
 
        # ❌ БАГ: не инвалидируем cache!
 
    def print_operation_timeline(self):
        """Выводит timeline всех операций для debugging"""
        print("\n" + "="*80)
        print("OPERATION TIMELINE (showing cache race)")
        print("="*80)
 
        for i, op in enumerate(self.operation_log):
            timestamp = op["timestamp"].split("T")[1][:12]  # HH:MM:SS.mmm
            operation = op["operation"]
            details = {k: v for k, v in op.items() if k not in ["timestamp", "operation"]}
 
            print(f"{i+1:2d}. [{timestamp}] {operation:20s} {details}")
 
        print("="*80 + "\n")
 
 
@pytest.fixture
async def observable_product_service(db_connection, redis_client):
    """Service с логированием"""
    return ObservableProductService(db_connection, redis_client)
 
 
@pytest.mark.asyncio
async def test_visualize_stale_cache_race(observable_product_service):
    """
    Debug тест: визуализируем КАК происходит stale cache.
    """
    service = observable_product_service
 
    # Создаём продукт
    await service.create_product("debug-product", "Test Product", 10000, 100)
 
    # Читаем (cache hit)
    await service.get_product("debug-product")
 
    # Обновляем price
    await service.update_product_price("debug-product", 5000)
 
    # Читаем снова (stale cache hit!)
    product = await service.get_product("debug-product")
 
    # Выводим timeline
    service.print_operation_timeline()
 
    # Показываем проблему
    print(f"\n⚠️  STALE CACHE DETECTED:")
    print(f"   DB price: $50.00 (updated)")
    print(f"   Cache price: ${product['price']/100:.2f} (stale!)")
    print(f"   Cache should have been invalidated after UPDATE_START\n")
 
 
@pytest.mark.asyncio
async def test_visualize_concurrent_race(observable_product_service):
    """
    Debug тест: визуализируем race между reader и writer.
    """
    service = observable_product_service
 
    await service.create_product("race-product", "Race Product", 10000, 50)
    await service.redis.delete("product:race-product")  # Force cache miss
 
    async def reader():
        await asyncio.sleep(0.001)
        return await service.get_product("race-product")
 
    async def writer():
        await asyncio.sleep(0.002)
        await service.update_product_price("race-product", 8000)
 
    # Запускаем параллельно
    await asyncio.gather(reader(), writer())
 
    # Timeline покажет порядок операций
    service.print_operation_timeline()
 
    print(f"\n📊 ANALYSIS:")
    print(f"   Look at the timeline above:")
    print(f"   - CACHE_MISS → DB_READ → (writer UPDATE) → CACHE_SET")
    print(f"   - Reader cached OLD value AFTER writer updated DB")
    print(f"   - This is a classic race condition\n")

Запуск с выводом timeline:

pytest tests/integration/test_cache_race_debug.py::test_visualize_stale_cache_race -v -s

Что вы увидите:

================================================================================
OPERATION TIMELINE (showing cache race)
================================================================================
 1. [10:15:23.001] CREATE_START         {'product_id': 'debug-product', 'price': 10000}
 2. [10:15:23.012] DB_INSERT            {'product_id': 'debug-product', 'price': 10000}
 3. [10:15:23.015] CACHE_SET            {'product_id': 'debug-product', 'price': 10000, 'ttl': 60}
 4. [10:15:23.020] GET_START            {'product_id': 'debug-product'}
 5. [10:15:23.022] CACHE_HIT            {'product_id': 'debug-product', 'price': 10000}
 6. [10:15:23.025] UPDATE_START         {'product_id': 'debug-product', 'new_price': 5000}
 7. [10:15:23.030] DB_UPDATE            {'product_id': 'debug-product', 'new_price': 5000}
 8. [10:15:23.035] GET_START            {'product_id': 'debug-product'}
 9. [10:15:23.037] CACHE_HIT            {'product_id': 'debug-product', 'price': 10000}  ← STALE!
================================================================================
 
⚠️  STALE CACHE DETECTED:
   DB price: $50.00 (updated)
   Cache price: $100.00 (stale!)
   Cache should have been invalidated after UPDATE_START

Проблема очевидна: После DB_UPDATE (строка 7) мы не инвалидировали cache, поэтому следующий GET_START (строка 8) получил CACHE_HIT со старыми данными.

Шаг 3. Фикс #1 — Cache Aside (manual invalidation)

Самое простое решение: явно удаляем cache после каждого update.

# tests/integration/test_cache_aside_pattern.py
import asyncio
import asyncpg
import pytest
import json
 
 
class ProductServiceCacheAside:
    """
    Cache Aside pattern: читаем из cache, пишем в DB + инвалидируем cache.
 
    Strategy:
    - Read: cache → DB (if miss) → populate cache
    - Write: DB → invalidate cache
    - Next read: cache miss → fresh from DB → populate
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
 
    async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
        await self.db.execute(
            "INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
            product_id, name, price, stock
        )
 
        product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
 
        # Кешируем после create
        await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
 
        return product
 
    async def get_product(self, product_id: str) -> dict:
        # 1. Try cache
        cached = await self.redis.get(f"product:{product_id}")
        if cached:
            return json.loads(cached)
 
        # 2. Cache miss → DB
        row = await self.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            product_id
        )
 
        if not row:
            raise KeyError(f"Product {product_id} not found")
 
        product = dict(row)
 
        # 3. Populate cache
        await self.redis.setex(f"product:{product_id}", 60, json.dumps(product))
 
        return product
 
    async def update_product_price(self, product_id: str, new_price: int) -> None:
        """
        ✅ ПРАВИЛЬНАЯ РЕАЛИЗАЦИЯ: обновляем DB + инвалидируем cache.
        """
        # 1. Update DB
        await self.db.execute(
            "UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
            new_price, product_id
        )
 
        # 2. ✅ Инвалидируем cache
        await self.redis.delete(f"product:{product_id}")
 
        # Следующий read будет cache miss → fresh data from DB
 
 
@pytest.fixture
async def cache_aside_service(db_connection, redis_client):
    """Service с Cache Aside pattern"""
    return ProductServiceCacheAside(db_connection, redis_client)
 
 
# ТЕСТЫ ПРОВЕРЯЮТ ЧТО CACHE ASIDE РАБОТАЕТ
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_cache_aside_no_stale_reads(cache_aside_service):
    """
    Тест #1: Cache Aside решает stale cache проблему.
    """
    service = cache_aside_service
 
    # Create
    await service.create_product("laptop-fixed", "Laptop", 10000, 50)
 
    # Read (cache hit)
    product = await service.get_product("laptop-fixed")
    assert product["price"] == 10000
 
    # Update
    await service.update_product_price("laptop-fixed", 5000)
 
    # Read again (cache miss → fresh from DB)
    product = await service.get_product("laptop-fixed")
 
    # ✅ NO stale cache!
    assert product["price"] == 5000
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_cache_aside_under_concurrent_load(cache_aside_service):
    """
    Тест #2: Cache Aside стабилен при параллельных updates.
    """
    service = cache_aside_service
 
    await service.create_product("monitor-fixed", "Monitor", 50000, 20)
 
    # 10 параллельных updates
    prices = [45000, 46000, 47000, 48000, 49000, 50000, 51000, 52000, 53000, 54000]
 
    await asyncio.gather(*[
        service.update_product_price("monitor-fixed", price)
        for price in prices
    ])
 
    # Читаем финальный результат
    product = await service.get_product("monitor-fixed")
 
    # Проверяем что cache и DB синхронизированы
    db_row = await service.db.fetchrow(
        "SELECT price FROM products WHERE product_id = $1",
        "monitor-fixed"
    )
 
    # ✅ Cache = DB (нет рассинхронизации)
    assert product["price"] == db_row["price"]
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_cache_aside_black_friday_scenario(cache_aside_service):
    """
    Тест #3: Black Friday сценарий теперь работает корректно.
    """
    service = cache_aside_service
 
    # Setup
    await service.create_product("headphones-fixed", "Headphones", 9900, 500)
 
    # User views (cache hit)
    product_view = await service.get_product("headphones-fixed")
    assert product_view["price"] == 9900
 
    # Admin applies discount
    await service.update_product_price("headphones-fixed", 7900)
 
    # User adds to cart (cache miss → fresh price!)
    product_in_cart = await service.get_product("headphones-fixed")
    cart_price = product_in_cart["price"]
 
    # Checkout
    db_row = await service.db.fetchrow(
        "SELECT price FROM products WHERE product_id = $1",
        "headphones-fixed"
    )
    checkout_price = db_row["price"]
 
    # ✅ NO price mismatch!
    assert cart_price == checkout_price == 7900

Запуск фикс-тестов:

pytest tests/integration/test_cache_aside_pattern.py -v
 
# ✅ PASS (3/3) — все тесты зелёные!

Почему Cache Aside работает:

  1. Update → DB write + cache invalidation (atomic в рамках функции)
  2. Next read → cache miss (потому что delete)
  3. Fetch from DB → получаем свежие данные
  4. Repopulate cache → кешируем актуальные данные

Плюсы:

  • ✅ Простая реализация
  • ✅ Всегда актуальные данные
  • ✅ Работает при любой нагрузке

Минусы:

  • ❌ Каждый update → cache miss → DB query (медленнее)
  • ❌ Cache stampede risk (если много readers после invalidation)

Шаг 4. Фикс #2 — Write Through (atomic write)

Write Through pattern: пишем в DB и cache атомарно (вместо delete).

# tests/integration/test_write_through_pattern.py
import asyncio
import asyncpg
import pytest
import json
 
 
class ProductServiceWriteThrough:
    """
    Write Through pattern: пишем в DB и cache одновременно.
 
    Strategy:
    - Read: cache → DB (if miss) → populate cache
    - Write: DB → update cache (не delete!)
    - Cache всегда актуален (strong consistency)
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
 
    async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
        await self.db.execute(
            "INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
            product_id, name, price, stock
        )
 
        product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
        await self.redis.setex(f"product:{product_id}", 300, json.dumps(product))
 
        return product
 
    async def get_product(self, product_id: str) -> dict:
        cached = await self.redis.get(f"product:{product_id}")
        if cached:
            return json.loads(cached)
 
        row = await self.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            product_id
        )
 
        if not row:
            raise KeyError(f"Product {product_id} not found")
 
        product = dict(row)
        await self.redis.setex(f"product:{product_id}", 300, json.dumps(product))
 
        return product
 
    async def update_product_price(self, product_id: str, new_price: int) -> None:
        """
        ✅ Write Through: обновляем DB и cache АТОМАРНО.
        """
        # 1. Update DB
        await self.db.execute(
            "UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
            new_price, product_id
        )
 
        # 2. Читаем полный объект из DB (для cache)
        row = await self.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            product_id
        )
 
        if row:
            product = dict(row)
            # 3. ✅ Обновляем cache (не delete!)
            await self.redis.setex(f"product:{product_id}", 300, json.dumps(product))
 
 
@pytest.fixture
async def write_through_service(db_connection, redis_client):
    """Service с Write Through pattern"""
    return ProductServiceWriteThrough(db_connection, redis_client)
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_write_through_no_cache_miss(write_through_service):
    """
    Тест #1: Write Through избегает cache miss после update.
    """
    service = write_through_service
 
    await service.create_product("keyboard-wt", "Keyboard", 15000, 100)
 
    # Update
    await service.update_product_price("keyboard-wt", 12000)
 
    # Проверяем что в cache УЖЕ новая цена (без DB query!)
    cached = await service.redis.get("product:keyboard-wt")
    cached_product = json.loads(cached)
 
    # ✅ Cache обновлён без miss
    assert cached_product["price"] == 12000
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_write_through_performance(write_through_service):
    """
    Тест #2: Write Through быстрее (нет cache miss).
 
    Сравниваем: Cache Aside vs Write Through.
    """
    service = write_through_service
 
    await service.create_product("mouse-wt", "Mouse", 5000, 200)
 
    import time
 
    # 100 updates + reads
    start = time.time()
    for i in range(100):
        await service.update_product_price("mouse-wt", 5000 + i)
        await service.get_product("mouse-wt")  # ← Cache hit (fast!)
 
    duration = time.time() - start
 
    print(f"\n⚡ Write Through: 100 update+read cycles in {duration:.3f}s")
    print(f"   Average: {duration/100*1000:.1f}ms per cycle")
 
    # Write Through должен быть < 3s (cache hits)
    assert duration < 3.0, f"Too slow: {duration:.3f}s"

Запуск:

pytest tests/integration/test_write_through_pattern.py -v -s

Результат:

⚡ Write Through: 100 update+read cycles in 0.845s
   Average: 8.5ms per cycle
 
✅ PASS (2/2)

Write Through плюсы:

  • ✅ Нет cache miss после update (быстрее)
  • ✅ Cache всегда актуален
  • ✅ Меньше нагрузка на DB (меньше reads)

Write Through минусы:

  • ❌ Сложнее реализация (нужно читать full object из DB)
  • ❌ Update медленнее (DB write + cache write)
  • ❌ Если cache недоступен → write fail (нужен fallback)

Шаг 5. Фикс #3 — TTL-based (eventual consistency)

TTL-based pattern: не инвалидируем cache, полагаемся на автоматический expiry.

# tests/integration/test_ttl_based_pattern.py
import asyncio
import asyncpg
import pytest
import json
 
 
class ProductServiceTTL:
    """
    TTL-based pattern: eventual consistency через короткий TTL.
 
    Strategy:
    - Read: cache → DB (if miss/expired) → populate with short TTL
    - Write: DB only (не трогаем cache!)
    - Cache expires через 5-10 секунд → auto-refresh
    """
 
    def __init__(self, db_conn, redis, cache_ttl=5):
        self.db = db_conn
        self.redis = redis
        self.cache_ttl = cache_ttl  # Короткий TTL для fast expiry
 
    async def create_product(self, product_id: str, name: str, price: int, stock: int) -> dict:
        await self.db.execute(
            "INSERT INTO products (product_id, name, price, stock) VALUES ($1, $2, $3, $4)",
            product_id, name, price, stock
        )
 
        product = {"product_id": product_id, "name": name, "price": price, "stock": stock}
 
        # Короткий TTL
        await self.redis.setex(f"product:{product_id}", self.cache_ttl, json.dumps(product))
 
        return product
 
    async def get_product(self, product_id: str) -> dict:
        cached = await self.redis.get(f"product:{product_id}")
        if cached:
            return json.loads(cached)
 
        row = await self.db.fetchrow(
            "SELECT product_id, name, price, stock FROM products WHERE product_id = $1",
            product_id
        )
 
        if not row:
            raise KeyError(f"Product {product_id} not found")
 
        product = dict(row)
 
        # Короткий TTL для быстрого обновления
        await self.redis.setex(f"product:{product_id}", self.cache_ttl, json.dumps(product))
 
        return product
 
    async def update_product_price(self, product_id: str, new_price: int) -> None:
        """
        ✅ TTL-based: обновляем только DB, НЕ трогаем cache.
 
        Cache обновится автоматически через TTL expiry.
        """
        await self.db.execute(
            "UPDATE products SET price = $1, updated_at = NOW() WHERE product_id = $2",
            new_price, product_id
        )
 
        # НЕ инвалидируем и НЕ обновляем cache!
        # Полагаемся на TTL expiry (5 секунд)
 
 
@pytest.fixture
async def ttl_service(db_connection, redis_client):
    """Service с TTL-based pattern (1 sec TTL для тестов)"""
    return ProductServiceTTL(db_connection, redis_client, cache_ttl=1)
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_ttl_eventual_consistency(ttl_service):
    """
    Тест #1: TTL pattern → eventual consistency.
 
    Timeline:
    - t=0: create product (price=$100)
    - t=0.1: update price to $50
    - t=0.2: read → stale $100 (cache not expired yet)
    - t=1.1: read → fresh $50 (cache expired, read from DB)
    """
    service = ttl_service
 
    # Create
    await service.create_product("webcam-ttl", "Webcam", 10000, 50)
 
    # Update immediately
    await service.update_product_price("webcam-ttl", 5000)
 
    # Read сразу после update (cache ещё не expired)
    product_stale = await service.get_product("webcam-ttl")
 
    # ⚠️ Может быть stale (зависит от timing)
    is_stale = product_stale["price"] == 10000
    is_fresh = product_stale["price"] == 5000
 
    if is_stale:
        print(f"\n⏱️  Stale read detected (expected with TTL pattern)")
 
        # Ждём expiry
        await asyncio.sleep(1.5)
 
        # Теперь cache expired → fresh from DB
        product_fresh = await service.get_product("webcam-ttl")
        assert product_fresh["price"] == 5000
        print(f"✅ After TTL expiry: fresh data from DB")
 
    else:
        print(f"\n✅ Got fresh data immediately (cache already expired)")
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_ttl_acceptable_staleness_window(ttl_service):
    """
    Тест #2: TTL pattern приемлем если staleness window короткий.
 
    Use case: показываем "примерную" цену, не критично если отстаёт на 1-5 сек.
    """
    service = ttl_service
 
    await service.create_product("charger-ttl", "Charger", 3000, 300)
 
    # Множественные updates
    for new_price in range(3000, 3100, 10):
        await service.update_product_price("charger-ttl", new_price)
 
    # Читаем сразу (может быть устаревшая цена)
    product = await service.get_product("charger-ttl")
 
    # Допускаем staleness до 1 секунды
    staleness_acceptable = True  # Для product listings это OK
 
    if staleness_acceptable:
        print(f"\n📊 TTL pattern suitable for:")
        print(f"   - Product listings (approximate prices)")
        print(f"   - Analytics dashboards (near-real-time)")
        print(f"   - Non-critical data (news feeds, recommendations)")
        print(f"   ")
        print(f"   NOT suitable for:")
        print(f"   - Payments (need exact price)")
        print(f"   - Inventory (can't sell out-of-stock items)")
        print(f"   - User balances (need accuracy)")

Trade-offs таблица:

StrategyConsistencyPerformanceComplexityUse Cases
Cache Aside✅ Strong🟡 Medium (cache miss)🟢 SimpleКритичные данные (payments, inventory)
Write Through✅ Strong✅ High (no miss)🟡 MediumЧасто читаемые, редко обновляемые
TTL-based🟡 Eventual✅ High🟢 SimpleНекритичные данные (listings, analytics)

Шаг 6. Production patterns

Distributed cache invalidation

Проблема: Несколько серверов используют cache, как синхронизировать invalidation?

Решение: Redis Pub/Sub

class ProductServiceWithPubSub:
    """
    Distributed cache invalidation через Redis Pub/Sub.
 
    Когда один сервер обновляет продукт:
    1. Update DB
    2. Delete local cache
    3. Publish "product:invalidated:{product_id}" в Redis
    4. Другие серверы получают сообщение → инвалидируют свой cache
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
        self.local_cache = {}  # In-memory cache для примера
 
    async def start_listening(self):
        """Subscribe на invalidation events"""
        pubsub = self.redis.pubsub()
        await pubsub.subscribe("product:invalidations")
 
        async for message in pubsub.listen():
            if message["type"] == "message":
                product_id = message["data"]
                # Инвалидируем local cache
                self.local_cache.pop(product_id, None)
 
    async def update_product_price(self, product_id: str, new_price: int) -> None:
        # Update DB
        await self.db.execute(
            "UPDATE products SET price = $1 WHERE product_id = $2",
            new_price, product_id
        )
 
        # Invalidate local cache
        self.local_cache.pop(product_id, None)
 
        # Invalidate Redis cache
        await self.redis.delete(f"product:{product_id}")
 
        # ✅ Notify other servers
        await self.redis.publish("product:invalidations", product_id)

Monitoring & Metrics

from prometheus_client import Counter, Histogram, Gauge
 
# Cache metrics
cache_hits = Counter("cache_hits_total", "Cache hits", ["cache_key_prefix"])
cache_misses = Counter("cache_misses_total", "Cache misses", ["cache_key_prefix"])
cache_ttl_expired = Counter("cache_ttl_expired_total", "Cache entries expired due to TTL")
stale_reads = Counter("cache_stale_reads_total", "Stale cache reads detected")
 
cache_get_duration = Histogram(
    "cache_get_duration_seconds",
    "Time spent getting from cache",
    ["status"]  # hit or miss
)
 
cache_size = Gauge("cache_size_bytes", "Current cache size")
 
 
class MonitoredProductService:
    """Product service с полной observability"""
 
    async def get_product(self, product_id: str) -> dict:
        import time
 
        start = time.time()
 
        cached = await self.redis.get(f"product:{product_id}")
 
        if cached:
            duration = time.time() - start
            cache_hits.labels(cache_key_prefix="product").inc()
            cache_get_duration.labels(status="hit").observe(duration)
            return json.loads(cached)
 
        duration = time.time() - start
        cache_misses.labels(cache_key_prefix="product").inc()
        cache_get_duration.labels(status="miss").observe(duration)
 
        # Fetch from DB...
        # (остальная логика)

Circuit Breaker для cache

class CacheWithCircuitBreaker:
    """
    Circuit breaker: если Redis падает, graceful degradation.
    """
 
    def __init__(self, redis, failure_threshold=5, timeout=60):
        self.redis = redis
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.state = "closed"  # closed | open | half-open
        self.last_failure_time = None
 
    async def get(self, key: str) -> str | None:
        if self.state == "open":
            # Circuit open → skip cache, go directly to DB
            return None
 
        try:
            value = await self.redis.get(key)
            self._on_success()
            return value
 
        except Exception as e:
            self._on_failure()
            # Graceful: return None (cache miss) вместо exception
            return None
 
    def _on_success(self):
        self.failure_count = 0
        if self.state == "half-open":
            self.state = "closed"
 
    def _on_failure(self):
        import time
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            self.last_failure_time = time.time()

📊 Business Impact & ROI

Problem cost (до фикса)

Situation (Black Friday incident):

  • Stale cache duration: 60 seconds (original TTL)
  • Affected customers: 234 in 2 hours
  • Revenue loss: $15,000 (abandoned carts)
  • Refunds: $8,000
  • Support overhead: $1,600
  • Engineering time: $1,200

Single incident cost: $25,800

Annual extrapolation:

  • Frequency: 1 incident/month (sales, price updates)
  • Annual cost: $309,600

Solution cost

Implementation:

  • Choose strategy (Cache Aside): 2 hours
  • Implement invalidation logic: 4 hours
  • Write tests: 3 hours
  • Deploy & validate: 2 hours
  • Total: 11h × $100/h = $1,100

ROI

Annual savings:

  • Incidents eliminated: $309,600/year
  • Investment: $1,100

ROI: 28,054% Break-even: 1.3 дня

Additional benefits:

  • ✅ Customer trust restored
  • ✅ No more support escalations
  • ✅ Faster checkout (correct prices first time)

Production checklist

  • Strategy selected: Cache Aside / Write Through / TTL-based
  • Tests written: Stale cache scenarios covered
  • Monitoring: Cache hit/miss metrics, staleness detection
  • Circuit breaker: Graceful degradation if Redis fails
  • TTL tuning: Balance freshness vs performance
  • Distributed invalidation: Pub/Sub if multi-server
  • Load testing: Verify under peak load
  • Runbook: What to do when cache inconsistency detected

Что вынести с урока

  1. Stale cache — реальная проблема, не corner case
  2. Cache Aside — золотой стандарт для критичных данных
  3. Write Through — для hot data (часто читаем, редко пишем)
  4. TTL-based — для некритичных данных (eventual consistency OK)
  5. Trade-offs matter: Consistency vs Performance vs Complexity
  6. Monitoring обязателен: Stale reads нужно видеть в метриках
  7. Circuit breaker для resilience: Cache падает → work without cache

Главное отличие от "учебных примеров":

  • ❌ Не игнорируем race conditions ("cache just works")
  • ✅ Показываем реальный Black Friday incident
  • ✅ Воспроизводим все 3 типа cache races
  • ✅ Даём 3 production-ready стратегии с trade-offs
  • ✅ Измеряем ROI: $309K annual savings

В следующем уроке: Observability для расследования flaky-тестов — Prometheus метрики, OpenTelemetry трейсы, structured logging.

Гонки кеша: устаревшие данные в Redis — Pytest: Борьба с Race Conditions в Async-коде — Potapov.me