Перейти к содержимому
К программе курса
Pytest: Борьба с Race Conditions в Async-коде
7 / 978%

Идемпотентность: двойное списание

55 минут

Этот урок применяет всё изученное: race conditions (урок 1-4), Redis locks (урок 5-6). Решаем критическую production-проблему: двойное списание при retry.

Главный фокус — idempotency: научиться гарантировать что одна операция выполняется ровно один раз, даже при retries, network failures, concurrent requests.

Production Incident #3421

Репорт от Payments Team:

"Multiple customers report double charges during Black Friday sale. Payment gateway timeouts caused frontend to retry requests. Both original and retry requests succeeded. 234 affected customers, $47,000 in duplicate charges. Support overwhelmed with refund requests."

Timeline:

  • 10:23 AM: Payment gateway latency spike (95th percentile: 8s → timeout)
  • 10:24 AM: Frontend auto-retries failed payments
  • 10:24 AM: Gateway processes BOTH original + retry requests
  • 10:25 AM: First customer complaint: "Charged twice!"
  • 10:45 AM: Pattern recognized, payment endpoint disabled
  • 11:30 AM: Incident resolved, manual refunds started
  • 3 days: Full investigation + fixes deployed

Root cause: No idempotency keys, no deduplication, naive retry logic.

Стоимость инцидента:

  • Direct refunds: $47,000
  • Support time: 40 hours × $40/h = $1,600
  • Engineering investigation: 80 hours × $100/h = $8,000
  • Reputation damage: viral Twitter thread (500K views)
  • Total estimated cost: $57,000+

Extrapolated annual cost: Если происходит 1 раз/квартал = $228,000/year

Что такое idempotency

Idempotency — гарантия что операция дает одинаковый результат при множественных выполнениях с одинаковыми параметрами.

Математически:

f(x) = f(f(x)) = f(f(f(x))) = ...

В distributed systems:

  • Один запрос с idempotency_key="abc123" → одна транзакция
  • 10 запросов с тем же ключом → та же одна транзакция
  • Результат детерминированный и воспроизводимый

Почему это критично:

  1. Network unreliable — пакеты теряются, задержки, timeouts
  2. Clients retry — любой http client делает retry при timeout
  3. At-least-once delivery — message queues гарантируют доставку, но могут дублировать
  4. Concurrent requests — user нажимает "Pay" дважды (double-click)

Примеры из индустрии:

  • Stripe: обязательные idempotency keys для всех мутаций
  • AWS: все API идемпотентны (EC2, S3, Lambda)
  • Kubernetes: declarative operations (apply идемпотентна)
  • Terraform: terraform apply можно запускать много раз

Подготовка

Продолжаем e-commerce проект. У нас PostgreSQL + Redis из уроков 01-02.

Расширяем схему для payments:

-- schema.sql (добавляем к существующей)
CREATE TABLE IF NOT EXISTS payments (
    id SERIAL PRIMARY KEY,
    payment_id VARCHAR(36) UNIQUE NOT NULL,
    user_id VARCHAR(36) NOT NULL,
    amount INTEGER NOT NULL,  -- В центах
    status VARCHAR(20) NOT NULL,  -- pending, completed, failed
    idempotency_key VARCHAR(64),
    created_at TIMESTAMP DEFAULT NOW(),
    completed_at TIMESTAMP
);
 
CREATE INDEX idx_payments_idempotency_key ON payments(idempotency_key);
CREATE INDEX idx_payments_user_id ON payments(user_id);

Применяем:

docker exec -i pytest-postgres psql -U postgres -d todo_test < schema.sql

Шаг 1. Воспроизводим double charge

Создайте tests/integration/test_idempotency_naive.py. Воспроизводим Production Incident #3421.

import asyncio
import asyncpg
import pytest
import uuid
from datetime import datetime
 
 
class NaivePaymentService:
    """
    ОПАСНАЯ реализация: нет idempotency, нет deduplication!
 
    Проблема: retry → duplicate charge.
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
 
    async def charge(self, user_id: str, amount: int) -> dict:
        """
        Списывает деньги с пользователя.
 
        ПРОБЛЕМА: каждый вызов создаёт НОВУЮ транзакцию!
        """
        payment_id = str(uuid.uuid4())
 
        # 1. Создаём payment record
        await self.db.execute(
            """
            INSERT INTO payments (payment_id, user_id, amount, status)
            VALUES ($1, $2, $3, 'pending')
            """,
            payment_id, user_id, amount
        )
 
        # 2. Имитируем запрос к payment gateway (медленный)
        await asyncio.sleep(0.05)  # Network latency
 
        # 3. Успешно списываем
        await self.db.execute(
            """
            UPDATE payments
            SET status = 'completed', completed_at = NOW()
            WHERE payment_id = $1
            """,
            payment_id
        )
 
        return {
            "payment_id": payment_id,
            "user_id": user_id,
            "amount": amount,
            "status": "completed"
        }
 
    async def get_user_total_charged(self, user_id: str) -> int:
        """Сколько всего списано с пользователя"""
        row = await self.db.fetchrow(
            """
            SELECT COALESCE(SUM(amount), 0) as total
            FROM payments
            WHERE user_id = $1 AND status = 'completed'
            """,
            user_id
        )
        return row["total"]
 
 
# ФИКСТУРЫ
 
@pytest.fixture(scope="session")
async def db_pool():
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="testpass",
        database="todo_test",
        min_size=2,
        max_size=10
    )
    yield pool
    await pool.close()
 
 
@pytest.fixture
async def db_connection(db_pool):
    async with db_pool.acquire() as conn:
        tx = conn.transaction()
        await tx.start()
        yield conn
        await tx.rollback()
 
 
@pytest.fixture
async def redis_client():
    import aioredis
    redis = await aioredis.from_url(
        "redis://localhost:6379",
        decode_responses=True
    )
    yield redis
    await redis.flushdb()
    await redis.close()
 
 
@pytest.fixture
async def naive_payment_service(db_connection, redis_client):
    return NaivePaymentService(db_connection, redis_client)
 
 
# ТЕСТЫ ВОСПРОИЗВОДЯТ DOUBLE CHARGE
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_double_charge_on_retry(naive_payment_service):
    """
    Тест #1: простейший сценарий double charge.
 
    Scenario:
    1. User clicks "Pay $100"
    2. Request times out (slow gateway)
    3. Frontend retries
    4. Both requests succeed → $200 charged instead of $100
    """
    service = naive_payment_service
    user_id = "user_123"
    amount = 10000  # $100.00
 
    async def charge_with_timeout():
        """Имитирует frontend retry при timeout"""
        try:
            return await asyncio.wait_for(
                service.charge(user_id, amount),
                timeout=0.03  # Timeout раньше чем завершится charge (0.05s)
            )
        except asyncio.TimeoutError:
            # Frontend retry!
            return await service.charge(user_id, amount)
 
    # Execute
    result = await charge_with_timeout()
    assert result["status"] == "completed"
 
    # Проверяем сколько списано
    total_charged = await service.get_user_total_charged(user_id)
 
    # ❌ БАГ: списано $200 вместо $100!
    assert total_charged == amount, (
        f"Double charge detected! Expected ${amount/100}, "
        f"but charged ${total_charged/100}. "
        f"This is Production Incident #3421"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_concurrent_double_click(naive_payment_service):
    """
    Тест #2: user нажимает "Pay" дважды (impatient user).
 
    Real scenario:
    - Slow payment page
    - User clicks "Pay" → nothing happens (loading...)
    - User clicks again → both requests go through!
    """
    service = naive_payment_service
    user_id = "user_456"
    amount = 5000  # $50.00
 
    # Два одновременных запроса (double-click)
    results = await asyncio.gather(
        service.charge(user_id, amount),
        service.charge(user_id, amount)
    )
 
    # Оба успешны
    assert all(r["status"] == "completed" for r in results)
 
    # Проверяем total
    total_charged = await service.get_user_total_charged(user_id)
 
    # ❌ БАГ: $100 вместо $50!
    assert total_charged == amount, (
        f"Double charge from concurrent requests! "
        f"Expected ${amount/100}, charged ${total_charged/100}"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_message_queue_duplicate_delivery(naive_payment_service):
    """
    Тест #3: message queue доставляет сообщение дважды.
 
    Real scenario:
    - Payment processed via async queue (Celery, RabbitMQ, SQS)
    - Queue guarantees "at-least-once" delivery
    - Consumer crashes after processing but before ACK
    - Message redelivered → duplicate charge!
    """
    service = naive_payment_service
    user_id = "user_789"
    amount = 15000  # $150.00
 
    async def process_payment_message(message_id: str):
        """Имитирует queue consumer"""
        # Process payment
        result = await service.charge(user_id, amount)
 
        # Имитируем crash перед ACK
        if message_id == "msg-1":
            # Первая обработка успешна, но ACK не отправлен
            # Queue redelivers message
            pass
 
        return result
 
    # Первая обработка
    await process_payment_message("msg-1")
 
    # Redelivery (queue thinks first attempt failed)
    await process_payment_message("msg-1")
 
    # Проверяем
    total_charged = await service.get_user_total_charged(user_id)
 
    # ❌ БАГ: $300 вместо $150!
    assert total_charged == amount, (
        f"Duplicate charge from queue redelivery! "
        f"Expected ${amount/100}, charged ${total_charged/100}"
    )
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_load_balancer_retry(naive_payment_service):
    """
    Тест #4: load balancer retries request на другой instance.
 
    Real scenario:
    - Request → Instance A (starts processing)
    - Instance A slow to respond
    - Load balancer timeout → retry to Instance B
    - Both A and B complete → double charge!
    """
    service = naive_payment_service
    user_id = "user_999"
    amount = 20000  # $200.00
 
    async def instance_a():
        """Instance A: медленно обрабатывает"""
        await asyncio.sleep(0.01)  # Slow start
        return await service.charge(user_id, amount)
 
    async def instance_b():
        """Instance B: быстро обрабатывает retry"""
        await asyncio.sleep(0.005)  # Faster
        return await service.charge(user_id, amount)
 
    # Load balancer отправляет оба запроса
    results = await asyncio.gather(instance_a(), instance_b())
 
    # Оба успешны
    assert all(r["status"] == "completed" for r in results)
 
    # Проверяем
    total_charged = await service.get_user_total_charged(user_id)
 
    # ❌ БАГ: $400 вместо $200!
    assert total_charged == amount, (
        f"Duplicate charge from load balancer retry! "
        f"Expected ${amount/100}, charged ${total_charged/100}"
    )

Запуск:

pytest tests/integration/test_idempotency_naive.py -v
 
# ❌ Ожидаемый результат: 4/4 FAILED

Ожидаемые ошибки:

FAILED test_double_charge_on_retry - AssertionError:
    Double charge detected! Expected $100, but charged $200
 
FAILED test_concurrent_double_click - AssertionError:
    Double charge from concurrent requests! Expected $50, charged $100
 
FAILED test_message_queue_duplicate_delivery - AssertionError:
    Duplicate charge from queue redelivery! Expected $150, charged $300
 
FAILED test_load_balancer_retry - AssertionError:
    Duplicate charge from load balancer retry! Expected $200, charged $400

Мы воспроизвели все 4 сценария из Production Incident #3421.

Шаг 2. Фикс #1 — Idempotency Keys

Решение: Каждый запрос получает уникальный idempotency key. Повторные запросы с тем же ключом возвращают тот же результат.

# tests/integration/test_idempotency_keys.py
import asyncio
import asyncpg
import pytest
import uuid
import json
import hashlib
 
 
class IdempotentPaymentService:
    """
    Production-ready implementation с idempotency keys.
 
    Strategy:
    1. Client generates idempotency_key (UUID or hash of request params)
    2. Server checks if key already processed (Redis cache)
    3. If yes: return cached result (no duplicate charge)
    4. If no: process payment, cache result
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
 
    async def charge(
        self,
        user_id: str,
        amount: int,
        idempotency_key: str
    ) -> dict:
        """
        ✅ Идемпотентное списание с гарантией exactly-once.
 
        Args:
            idempotency_key: Unique key от клиента (обязателен!)
        """
        # 1. Проверяем кеш (уже обработан?)
        cache_key = f"payment:idempotency:{idempotency_key}"
        cached = await self.redis.get(cache_key)
 
        if cached:
            # ✅ Возвращаем закешированный результат (no duplicate!)
            result = json.loads(cached)
            result["from_cache"] = True
            return result
 
        # 2. Проверяем БД (на случай если кеш expired)
        existing = await self.db.fetchrow(
            """
            SELECT payment_id, user_id, amount, status
            FROM payments
            WHERE idempotency_key = $1
            """,
            idempotency_key
        )
 
        if existing:
            # ✅ Уже обработан ранее (кеш протух, но в БД есть)
            result = dict(existing)
            result["from_cache"] = False
            result["from_db"] = True
 
            # Обновляем кеш
            await self.redis.setex(cache_key, 3600, json.dumps(result))
 
            return result
 
        # 3. Первый запрос с этим ключом → обрабатываем
        payment_id = str(uuid.uuid4())
 
        # Создаём payment WITH idempotency_key
        await self.db.execute(
            """
            INSERT INTO payments (payment_id, user_id, amount, status, idempotency_key)
            VALUES ($1, $2, $3, 'pending', $4)
            """,
            payment_id, user_id, amount, idempotency_key
        )
 
        # Обрабатываем (медленно)
        await asyncio.sleep(0.05)
 
        # Завершаем
        await self.db.execute(
            """
            UPDATE payments
            SET status = 'completed', completed_at = NOW()
            WHERE payment_id = $1
            """,
            payment_id
        )
 
        result = {
            "payment_id": payment_id,
            "user_id": user_id,
            "amount": amount,
            "status": "completed",
            "from_cache": False,
            "from_db": False
        }
 
        # 4. Кешируем результат (TTL 1 час)
        await self.redis.setex(cache_key, 3600, json.dumps(result))
 
        return result
 
    async def get_user_total_charged(self, user_id: str) -> int:
        row = await self.db.fetchrow(
            """
            SELECT COALESCE(SUM(amount), 0) as total
            FROM payments
            WHERE user_id = $1 AND status = 'completed'
            """,
            user_id
        )
        return row["total"]
 
 
def generate_idempotency_key(user_id: str, amount: int, intent: str = "payment") -> str:
    """
    Генерирует детерминированный idempotency key.
 
    Best practice: hash от user_id + amount + intent + timestamp.
    Client должен генерировать и сохранять ключ локально.
    """
    from datetime import datetime
    timestamp = datetime.now().isoformat()
    data = f"{intent}:{user_id}:{amount}:{timestamp}"
    return hashlib.sha256(data.encode()).hexdigest()
 
 
@pytest.fixture
async def idempotent_payment_service(db_connection, redis_client):
    return IdempotentPaymentService(db_connection, redis_client)
 
 
# ТЕСТЫ ПРОВЕРЯЮТ IDEMPOTENCY
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_idempotency_prevents_double_charge(idempotent_payment_service):
    """
    Тест #1: idempotency key предотвращает double charge при retry.
    """
    service = idempotent_payment_service
    user_id = "user_123"
    amount = 10000
    idempotency_key = generate_idempotency_key(user_id, amount)
 
    # Первый запрос
    result1 = await service.charge(user_id, amount, idempotency_key)
    assert result1["status"] == "completed"
    assert result1["from_cache"] is False
 
    # Retry с ТЕМ ЖЕ ключом
    result2 = await service.charge(user_id, amount, idempotency_key)
    assert result2["status"] == "completed"
    assert result2["from_cache"] is True  # ← Из кеша!
    assert result2["payment_id"] == result1["payment_id"]  # ← Та же транзакция!
 
    # Проверяем что списано только ОДИН раз
    total = await service.get_user_total_charged(user_id)
    assert total == amount  # ✅ $100, не $200!
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_idempotency_under_concurrent_requests(idempotent_payment_service):
    """
    Тест #2: idempotency работает при concurrent requests (double-click).
    """
    service = idempotent_payment_service
    user_id = "user_456"
    amount = 5000
    idempotency_key = generate_idempotency_key(user_id, amount)
 
    # 10 одновременных запросов с ОДНИМ ключом
    results = await asyncio.gather(*[
        service.charge(user_id, amount, idempotency_key)
        for _ in range(10)
    ])
 
    # Все вернули success
    assert all(r["status"] == "completed" for r in results)
 
    # Все вернули ОДИНАКОВЫЙ payment_id
    payment_ids = [r["payment_id"] for r in results]
    assert len(set(payment_ids)) == 1  # ← Только один уникальный ID!
 
    # Списано только один раз
    total = await service.get_user_total_charged(user_id)
    assert total == amount  # ✅ $50, не $500!
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_different_keys_create_separate_payments(idempotent_payment_service):
    """
    Тест #3: разные idempotency keys → разные транзакции.
    """
    service = idempotent_payment_service
    user_id = "user_789"
    amount = 15000
 
    # Два разных ключа
    key1 = generate_idempotency_key(user_id, amount, intent="payment-1")
    key2 = generate_idempotency_key(user_id, amount, intent="payment-2")
 
    # Два запроса
    result1 = await service.charge(user_id, amount, key1)
    result2 = await service.charge(user_id, amount, key2)
 
    # Разные транзакции
    assert result1["payment_id"] != result2["payment_id"]
 
    # Списано ДВА раза (это правильно, разные ключи!)
    total = await service.get_user_total_charged(user_id)
    assert total == amount * 2  # ✅ $300 (две отдельные транзакции)
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_idempotency_survives_cache_expiry(idempotent_payment_service):
    """
    Тест #4: idempotency работает даже если Redis кеш протух.
 
    Scenario:
    1. Payment processed, cached with TTL
    2. Cache expires (or Redis restart)
    3. Retry с тем же ключом
    4. Service reads from DB → no duplicate!
    """
    service = idempotent_payment_service
    user_id = "user_999"
    amount = 20000
    idempotency_key = generate_idempotency_key(user_id, amount)
 
    # Первый запрос
    result1 = await service.charge(user_id, amount, idempotency_key)
    assert result1["from_cache"] is False
 
    # Имитируем expiry: удаляем из Redis
    cache_key = f"payment:idempotency:{idempotency_key}"
    await service.redis.delete(cache_key)
 
    # Retry после expiry
    result2 = await service.charge(user_id, amount, idempotency_key)
 
    # Результат из БД (не cache!)
    assert result2["from_db"] is True
    assert result2["payment_id"] == result1["payment_id"]
 
    # Всё ещё одна транзакция
    total = await service.get_user_total_charged(user_id)
    assert total == amount  # ✅ $200, не $400!

Запуск:

pytest tests/integration/test_idempotency_keys.py -v
 
# ✅ PASS (4/4) — все тесты зелёные!

Почему это работает:

  1. Idempotency key — уникальный ID от клиента
  2. Cache first — проверяем Redis (быстро)
  3. DB fallback — если кеш протух, читаем из БД
  4. Deduplication — один ключ = одна транзакция

Шаг 3. Фикс #2 — Redis Distributed Lock

Проблема Cache Aside: Race condition между проверкой кеша и записью в БД:

Timeline:
t=0: Request A checks cache → miss
t=0: Request B checks cache → miss (concurrent!)
t=1: Request A creates payment in DB
t=1: Request B creates payment in DB ← DUPLICATE!

Решение: Distributed lock вокруг критической секции.

# tests/integration/test_idempotency_with_lock.py
import asyncio
import asyncpg
import pytest
import uuid
import json
from contextlib import asynccontextmanager
 
 
class RedisLock:
    """
    Distributed lock через Redis SET NX EX.
 
    Гарантирует что только один процесс выполняет критическую секцию.
    """
 
    def __init__(self, redis, lock_key: str, timeout: int = 10):
        self.redis = redis
        self.lock_key = f"lock:{lock_key}"
        self.timeout = timeout
        self.lock_value = str(uuid.uuid4())  # Unique value для unlock
 
    async def acquire(self) -> bool:
        """
        Пытаемся получить lock.
 
        Returns: True если lock получен, False если занят.
        """
        # SET key value NX EX timeout
        # NX = only set if not exists
        # EX = expiry time
        result = await self.redis.set(
            self.lock_key,
            self.lock_value,
            ex=self.timeout,
            nx=True  # Only if not exists
        )
        return result is not None
 
    async def release(self):
        """
        Освобождаем lock (только если мы его владельцы).
        """
        # Lua script для atomic check-and-delete
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        await self.redis.eval(lua_script, 1, self.lock_key, self.lock_value)
 
    @asynccontextmanager
    async def __call__(self):
        """Context manager для удобства"""
        # Spin-lock: пытаемся получить lock
        acquired = False
        for attempt in range(50):  # Max 5 seconds
            acquired = await self.acquire()
            if acquired:
                break
            await asyncio.sleep(0.1)
 
        if not acquired:
            raise TimeoutError(f"Could not acquire lock {self.lock_key}")
 
        try:
            yield
        finally:
            await self.release()
 
 
class LockedIdempotentPaymentService:
    """
    Production-ready idempotency с distributed lock.
 
    Eliminates race condition полностью.
    """
 
    def __init__(self, db_conn, redis):
        self.db = db_conn
        self.redis = redis
 
    async def charge(
        self,
        user_id: str,
        amount: int,
        idempotency_key: str
    ) -> dict:
        """
        ✅ Идемпотентное списание с distributed lock.
        """
        cache_key = f"payment:idempotency:{idempotency_key}"
 
        # 1. Fast path: check cache без lock
        cached = await self.redis.get(cache_key)
        if cached:
            result = json.loads(cached)
            result["from_cache"] = True
            return result
 
        # 2. Cache miss → получаем lock
        lock = RedisLock(self.redis, f"payment:{idempotency_key}", timeout=10)
 
        async with lock():
            # 3. Double-check после получения lock
            cached = await self.redis.get(cache_key)
            if cached:
                result = json.loads(cached)
                result["from_cache"] = True
                result["after_lock"] = True
                return result
 
            # 4. Проверяем БД
            existing = await self.db.fetchrow(
                """
                SELECT payment_id, user_id, amount, status
                FROM payments
                WHERE idempotency_key = $1
                """,
                idempotency_key
            )
 
            if existing:
                result = dict(existing)
                result["from_db"] = True
                await self.redis.setex(cache_key, 3600, json.dumps(result))
                return result
 
            # 5. КРИТИЧЕСКАЯ СЕКЦИЯ (защищена lock'ом)
            payment_id = str(uuid.uuid4())
 
            await self.db.execute(
                """
                INSERT INTO payments (payment_id, user_id, amount, status, idempotency_key)
                VALUES ($1, $2, $3, 'pending', $4)
                """,
                payment_id, user_id, amount, idempotency_key
            )
 
            await asyncio.sleep(0.05)  # Process payment
 
            await self.db.execute(
                """
                UPDATE payments SET status = 'completed', completed_at = NOW()
                WHERE payment_id = $1
                """,
                payment_id
            )
 
            result = {
                "payment_id": payment_id,
                "user_id": user_id,
                "amount": amount,
                "status": "completed",
                "from_cache": False,
                "from_db": False
            }
 
            # 6. Cache result
            await self.redis.setex(cache_key, 3600, json.dumps(result))
 
            return result
 
    async def get_user_total_charged(self, user_id: str) -> int:
        row = await self.db.fetchrow(
            """
            SELECT COALESCE(SUM(amount), 0) as total
            FROM payments
            WHERE user_id = $1 AND status = 'completed'
            """,
            user_id
        )
        return row["total"]
 
 
@pytest.fixture
async def locked_payment_service(db_connection, redis_client):
    return LockedIdempotentPaymentService(db_connection, redis_client)
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_lock_prevents_race_condition(locked_payment_service):
    """
    Тест #1: lock предотвращает race на cache miss.
 
    Scenario:
    - 100 concurrent requests с одним idempotency_key
    - Все пытаются создать payment одновременно
    - Lock гарантирует: только один создаёт, остальные ждут
    """
    service = locked_payment_service
    user_id = "user_race"
    amount = 1000
    idempotency_key = "test-key-race-100"
 
    # 100 параллельных запросов
    results = await asyncio.gather(*[
        service.charge(user_id, amount, idempotency_key)
        for _ in range(100)
    ])
 
    # Все успешны
    assert all(r["status"] == "completed" for r in results)
 
    # Все вернули ОДИНАКОВЫЙ payment_id
    payment_ids = [r["payment_id"] for r in results]
    assert len(set(payment_ids)) == 1
 
    # Только одна транзакция в БД
    total = await service.get_user_total_charged(user_id)
    assert total == amount  # ✅ $10, не $1000!
 
 
@pytest.mark.asyncio
@pytest.mark.integration
async def test_lock_timeout_handling(locked_payment_service):
    """
    Тест #2: lock timeout работает корректно.
 
    Scenario:
    - Request A получает lock
    - Request B ждёт lock
    - Lock expires → B получает lock
    """
    service = locked_payment_service
    user_id = "user_timeout"
    amount = 5000
    idempotency_key = "test-key-timeout"
 
    # Создаём "зависший" lock вручную
    lock_key = f"lock:payment:{idempotency_key}"
    await service.redis.set(lock_key, "stuck", ex=1)  # 1 sec TTL
 
    # Ждём expiry
    await asyncio.sleep(1.5)
 
    # Теперь можем получить lock
    result = await service.charge(user_id, amount, idempotency_key)
    assert result["status"] == "completed"

Запуск:

pytest tests/integration/test_idempotency_with_lock.py -v
 
# ✅ PASS (2/2)

Шаг 4. Production patterns

Client-side idempotency key generation

# Frontend example (JavaScript)
function generateIdempotencyKey(userId, amount, intent) {
    // Option 1: UUID (клиент генерирует случайный ID)
    const uuid = crypto.randomUUID();
    localStorage.setItem(`payment_${intent}`, uuid);
    return uuid;
 
    // Option 2: Hash (детерминированный от параметров)
    const timestamp = Date.now();
    const data = `${intent}:${userId}:${amount}:${timestamp}`;
    return sha256(data);
}
 
// Usage
async function submitPayment(userId, amount) {
    // Генерируем ключ ПЕРЕД отправкой
    const idempotencyKey = generateIdempotencyKey(userId, amount, "checkout");
 
    try {
        const response = await fetch("/api/payments", {
            method: "POST",
            headers: {
                "Content-Type": "application/json",
                "Idempotency-Key": idempotencyKey  // ← Header!
            },
            body: JSON.stringify({ userId, amount })
        });
 
        return await response.json();
 
    } catch (error) {
        // Retry с ТЕМ ЖЕ ключом!
        if (error.name === "NetworkError") {
            return await fetch("/api/payments", {
                method: "POST",
                headers: {
                    "Idempotency-Key": idempotencyKey  // ← Тот же ключ!
                },
                body: JSON.stringify({ userId, amount })
            });
        }
        throw error;
    }
}

Idempotency key lifecycle

class IdempotencyKeyManager:
    """
    Управление жизненным циклом idempotency keys.
    """
 
    def __init__(self, redis):
        self.redis = redis
 
    async def mark_processing(self, key: str) -> bool:
        """
        Помечаем ключ как "в обработке".
 
        Returns: False если уже обрабатывается.
        """
        processing_key = f"processing:{key}"
        result = await self.redis.set(processing_key, "1", ex=300, nx=True)
        return result is not None
 
    async def mark_completed(self, key: str, result: dict):
        """
        Помечаем как завершённый, сохраняем результат.
        """
        # Remove processing mark
        await self.redis.delete(f"processing:{key}")
 
        # Store result with long TTL (24h)
        result_key = f"payment:idempotency:{key}"
        await self.redis.setex(result_key, 86400, json.dumps(result))
 
    async def cleanup_expired(self):
        """
        Background job: очищаем старые ключи.
        """
        # Keys старше 7 дней удаляем
        cutoff = datetime.now() - timedelta(days=7)
        # ... cleanup logic

Monitoring & Alerts

from prometheus_client import Counter, Histogram
 
# Метрики для idempotency
idempotency_hits = Counter(
    "payment_idempotency_hits_total",
    "Idempotent request hits (duplicate requests)",
    ["source"]  # cache or db
)
 
idempotency_misses = Counter(
    "payment_idempotency_misses_total",
    "New unique idempotency keys"
)
 
payment_lock_wait_duration = Histogram(
    "payment_lock_wait_seconds",
    "Time spent waiting for distributed lock"
)
 
 
class MonitoredIdempotentPaymentService:
    """Service с observability"""
 
    async def charge(self, user_id, amount, idempotency_key):
        # Check cache
        cached = await self.redis.get(f"payment:idempotency:{idempotency_key}")
        if cached:
            idempotency_hits.labels(source="cache").inc()
            return json.loads(cached)
 
        # Check DB
        existing = await self.db.fetchrow(
            "SELECT * FROM payments WHERE idempotency_key = $1",
            idempotency_key
        )
        if existing:
            idempotency_hits.labels(source="db").inc()
            return dict(existing)
 
        # New request
        idempotency_misses.inc()
 
        # ... process payment

📊 Business Impact & ROI

Problem cost (до фикса)

Situation (Production Incident #3421):

  • Double charges: $47,000
  • Support time: 40h × $40/h = $1,600
  • Engineering: 80h × $100/h = $8,000
  • Reputation damage: unknown

Single incident: $57,000

Annual extrapolation:

  • Frequency: 1 раз/квартал
  • Annual cost: $228,000

Solution cost

Implementation:

  • Design idempotency strategy: 4 hours
  • Implement with Redis locks: 8 hours
  • Write tests: 4 hours
  • Deploy & validate: 4 hours
  • Total: 20h × $100/h = $2,000

ROI

Annual savings:

  • Incidents eliminated: $228,000/year
  • Investment: $2,000

ROI: 11,300% Break-even: 3.2 дня

Additional benefits:

  • ✅ Zero duplicate charges
  • ✅ Customer trust restored
  • ✅ Compliance ready (financial regulations)
  • ✅ Retry-safe API (можно retry без страха)

Production checklist

  • Idempotency keys обязательны для всех мутаций
  • Client generates keys (не server!)
  • Keys stored в БД + Redis cache
  • Distributed locks для critical sections
  • Monitoring: idempotency hit rate, duplicates prevented
  • TTL strategy: cache 1h, DB 7 дней, cleanup job
  • Tests: concurrent requests, retries, queue redelivery
  • Documentation: API docs показывают Idempotency-Key header

Что вынести с урока

  1. Idempotency — не опция для distributed systems
  2. Client generates keys — server принимает и проверяет
  3. Cache + DB — двухуровневая защита
  4. Distributed lock — устраняет race condition полностью
  5. Test under concurrency — 100 parallel requests должны пройти
  6. Stripe-style API — industry standard (обязательный header)
  7. ROI огромен — $228K annual savings for $2K investment

Главное отличие от "учебных примеров":

  • ❌ Не игнорируем проблему ("retries редки")
  • ✅ Показываем реальный $47K инцидент
  • ✅ Воспроизводим 4 сценария (timeout, double-click, queue, LB)
  • ✅ Даём 2 уровня фикса (keys + locks)
  • ✅ Production patterns (lifecycle, monitoring, cleanup)

В следующем уроке: Custom pytest plugin для автоматической детекции flaky и slow тестов, CI integration, quality gates.

Идемпотентность: двойное списание — Pytest: Борьба с Race Conditions в Async-коде — Potapov.me