Skip to main content
Back to course
Pytest: Борьба с Race Conditions в Async-коде
4 / 944%

Гонки в PostgreSQL: lost update и грязное чтение

45 минут

В этом уроке переходим от in-memory гонок (asyncio.Lock) к гонкам на уровне базы данных. Это самый частый источник production-багов в асинхронных приложениях.

Главная цель: научиться распознавать и фиксить race conditions в PostgreSQL через правильные транзакции и блокировки.

Проблема: Lost Update

Production кейс:

Два async worker'а одновременно обновляют счетчик просмотров статьи:

# Наивная реализация (с гонкой)
async def increment_views(article_id: str, db_connection):
    # 1. Читаем текущее значение
    result = await db_connection.fetchrow(
        "SELECT views FROM articles WHERE id = $1",
        article_id
    )
    current_views = result["views"]
 
    # 2. Задержка (имитация логики)
    await asyncio.sleep(0.01)
 
    # 3. Записываем увеличенное значение
    await db_connection.execute(
        "UPDATE articles SET views = $1 WHERE id = $2",
        current_views + 1,
        article_id
    )

Гонка:

Worker 1: SELECT views → 100
Worker 2: SELECT views → 100  ← оба прочитали 100!
Worker 1: UPDATE views = 101
Worker 2: UPDATE views = 101  ← перезаписал результат Worker 1!
 
Результат: views = 101 (ожидали 102)

Это называется Lost Update — одно обновление потерялось.

Воспроизводим в тесте

Подготовка:

-- schema.sql
CREATE TABLE IF NOT EXISTS articles (
    id VARCHAR(36) PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    views INTEGER DEFAULT 0
);

Async фикстура:

# conftest.py
import pytest
import asyncpg
 
@pytest.fixture
async def db_connection():
    """Async connection с rollback"""
    conn = await asyncpg.connect(
        "postgresql://postgres:testpass@localhost:5432/todo_test"
    )
 
    # Начинаем транзакцию
    await conn.execute("BEGIN")
 
    yield conn
 
    # Откатываем после теста
    await conn.execute("ROLLBACK")
    await conn.close()
 
@pytest.fixture
async def article_id(db_connection):
    """Создаём тестовую статью"""
    article_id = "test-article-123"
    await db_connection.execute(
        "INSERT INTO articles (id, title, views) VALUES ($1, $2, $3)",
        article_id, "Test Article", 0
    )
    return article_id

Тест (воспроизводим гонку):

# tests/test_article_race.py
import pytest
import asyncio
 
async def increment_views_naive(article_id: str, conn):
    """НЕ thread-safe версия"""
    result = await conn.fetchrow(
        "SELECT views FROM articles WHERE id = $1",
        article_id
    )
    current_views = result["views"]
 
    # Задержка чтобы гонка проявилась
    await asyncio.sleep(0.01)
 
    await conn.execute(
        "UPDATE articles SET views = $1 WHERE id = $2",
        current_views + 1,
        article_id
    )
 
@pytest.mark.asyncio
async def test_lost_update(db_connection, article_id):
    """Воспроизводим Lost Update"""
    # Запускаем 10 параллельных инкрементов
    await asyncio.gather(*[
        increment_views_naive(article_id, db_connection)
        for _ in range(10)
    ])
 
    # Ожидаем views = 10
    result = await db_connection.fetchrow(
        "SELECT views FROM articles WHERE id = $1",
        article_id
    )
    assert result["views"] == 10  # ❌ FAIL! (views = 3-7)

Запускаем:

pytest tests/test_article_race.py -v

Результат:

AssertionError: assert 6 == 10
# Или 4, или 8... каждый раз разное!

Исправление #1: Атомарный UPDATE

Правильная реализация:

async def increment_views_atomic(article_id: str, conn):
    """Thread-safe версия через атомарный UPDATE"""
    await conn.execute(
        "UPDATE articles SET views = views + 1 WHERE id = $1",
        article_id
    )
    # PostgreSQL гарантирует атомарность этой операции

Почему работает:

PostgreSQL выполняет UPDATE views = views + 1 атомарно:

  1. Блокирует строку
  2. Читает views
  3. Увеличивает на 1
  4. Записывает
  5. Освобождает блокировку

Тест с атомарным UPDATE:

@pytest.mark.asyncio
async def test_atomic_update(db_connection, article_id):
    """Атомарный UPDATE без гонки"""
    await asyncio.gather(*[
        increment_views_atomic(article_id, db_connection)
        for _ in range(10)
    ])
 
    result = await db_connection.fetchrow(
        "SELECT views FROM articles WHERE id = $1",
        article_id
    )
    assert result["views"] == 10  # ✅ PASS (всегда!)

Исправление #2: SELECT FOR UPDATE

Если нужна более сложная логика (не просто +1), используем SELECT FOR UPDATE:

async def increment_views_with_logic(article_id: str, conn):
    """
    Thread-safe версия с кастомной логикой.
    Например: не инкрементируем если views > 1000.
    """
    # SELECT FOR UPDATE блокирует строку для других транзакций
    result = await conn.fetchrow(
        "SELECT views FROM articles WHERE id = $1 FOR UPDATE",
        article_id
    )
    current_views = result["views"]
 
    # Кастомная логика
    if current_views < 1000:
        new_views = current_views + 1
    else:
        new_views = current_views  # Не инкрементируем
 
    await conn.execute(
        "UPDATE articles SET views = $1 WHERE id = $2",
        new_views,
        article_id
    )
    # Блокировка снимается после commit транзакции

Как работает FOR UPDATE:

Worker 1: SELECT ... FOR UPDATE → блокирует строку
Worker 2: SELECT ... FOR UPDATE → ждёт освобождения ⏳
Worker 1: UPDATE, COMMIT → освобождает блокировку
Worker 2: теперь может прочитать актуальные данные

Тест:

@pytest.mark.asyncio
async def test_select_for_update(db_connection, article_id):
    """SELECT FOR UPDATE гарантирует изоляцию"""
    await asyncio.gather(*[
        increment_views_with_logic(article_id, db_connection)
        for _ in range(10)
    ])
 
    result = await db_connection.fetchrow(
        "SELECT views FROM articles WHERE id = $1",
        article_id
    )
    assert result["views"] == 10  # ✅ PASS

Проблема #2: Dirty Read

Ситуация: один worker читает данные, которые другой worker ещё не закоммитил (и может откатить).

Пример:

# Worker 1 (незакоммиченная транзакция)
await conn.execute("UPDATE articles SET views = 999 WHERE id = $1", article_id)
# НЕТ COMMIT!
 
# Worker 2 (читает)
result = await conn2.fetchrow("SELECT views FROM articles WHERE id = $1", article_id)
# Что прочитаем: 0 (старое) или 999 (новое незакоммиченное)?

Зависит от isolation level:

  • READ UNCOMMITTED — прочитаем 999 (грязное чтение)
  • READ COMMITTED (default) — прочитаем 0 (стабильное)
  • REPEATABLE READ — прочитаем 0 + защита от phantom reads
  • SERIALIZABLE — полная изоляция

Тест на Dirty Read:

@pytest.mark.asyncio
async def test_no_dirty_read():
    """PostgreSQL default isolation не допускает dirty reads"""
    conn1 = await asyncpg.connect(...)
    conn2 = await asyncpg.connect(...)
 
    await conn1.execute("BEGIN")
    await conn2.execute("BEGIN")
 
    article_id = "test-article"
    await conn1.execute("INSERT INTO articles (id, views) VALUES ($1, 0)", article_id)
 
    # conn1: обновляем но НЕ коммитим
    await conn1.execute("UPDATE articles SET views = 999 WHERE id = $1", article_id)
 
    # conn2: пытаемся прочитать
    result = await conn2.fetchrow("SELECT views FROM articles WHERE id = $1", article_id)
 
    # PostgreSQL READ COMMITTED: conn2 НЕ видит незакоммиченные данные
    assert result is None  # ✅ Dirty read не произошёл
 
    await conn1.execute("ROLLBACK")
    await conn2.execute("ROLLBACK")
    await conn1.close()
    await conn2.close()

Практика: система бронирования мест

Реальный кейс: два пользователя бронируют последнее место в кинотеатре.

Схема:

CREATE TABLE seats (
    id SERIAL PRIMARY KEY,
    seat_number VARCHAR(10) UNIQUE NOT NULL,
    booked BOOLEAN DEFAULT FALSE,
    booked_by VARCHAR(100)
);

Наивная реализация (с гонкой):

async def book_seat_naive(seat_number: str, user: str, conn):
    """НЕ thread-safe версия"""
    result = await conn.fetchrow(
        "SELECT booked FROM seats WHERE seat_number = $1",
        seat_number
    )
 
    if not result["booked"]:
        await asyncio.sleep(0.01)  # Имитация задержки
        await conn.execute(
            "UPDATE seats SET booked = TRUE, booked_by = $1 WHERE seat_number = $2",
            user, seat_number
        )
        return True  # Успешно забронировали
    return False  # Место занято

Тест (гонка):

@pytest.mark.asyncio
async def test_double_booking(db_connection):
    """Два пользователя бронируют одно место"""
    await db_connection.execute(
        "INSERT INTO seats (seat_number, booked) VALUES ($1, FALSE)",
        "A1"
    )
 
    results = await asyncio.gather(
        book_seat_naive("A1", "Alice", db_connection),
        book_seat_naive("A1", "Bob", db_connection)
    )
 
    # Оба должны получить True? НЕТ!
    assert results.count(True) == 1  # ❌ FAIL! Оба получили True

Исправление с SELECT FOR UPDATE:

async def book_seat_safe(seat_number: str, user: str, conn):
    """Thread-safe версия"""
    result = await conn.fetchrow(
        "SELECT booked FROM seats WHERE seat_number = $1 FOR UPDATE",
        seat_number
    )
 
    if not result["booked"]:
        await conn.execute(
            "UPDATE seats SET booked = TRUE, booked_by = $1 WHERE seat_number = $2",
            user, seat_number
        )
        return True
    return False

Тест (без гонки):

@pytest.mark.asyncio
async def test_safe_booking(db_connection):
    """SELECT FOR UPDATE предотвращает двойное бронирование"""
    await db_connection.execute(
        "INSERT INTO seats (seat_number, booked) VALUES ($1, FALSE)",
        "A1"
    )
 
    results = await asyncio.gather(
        book_seat_safe("A1", "Alice", db_connection),
        book_seat_safe("A1", "Bob", db_connection)
    )
 
    # Только один получил место
    assert results.count(True) == 1  # ✅ PASS
    assert results.count(False) == 1

Типичные ошибки

Ошибка #1: Забыли FOR UPDATE

# ❌ Без FOR UPDATE → гонка
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1", item_id)
if result["stock"] > 0:
    await conn.execute("UPDATE items SET stock = stock - 1 WHERE id = $1", item_id)

Исправление:

# ✅ С FOR UPDATE
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)

Ошибка #2: Использовали FOR UPDATE без транзакции

# ❌ FOR UPDATE работает только в транзакции!
conn = await asyncpg.connect(...)  # Нет BEGIN!
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)

Исправление:

await conn.execute("BEGIN")
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)
# ...
await conn.execute("COMMIT")

Ошибка #3: Долгая логика под блокировкой

# ❌ Медленная логика держит блокировку
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)
await send_email_notification()  # 3 секунды!
await conn.execute("UPDATE ...")

Проблема: другие worker'ы ждут 3 секунды.

Исправление: выполняйте медленную логику после COMMIT.

Что вы узнали

Lost Update — гонка при UPDATE, фиксится атомарными операциями ✅ SELECT FOR UPDATE — блокирует строку для других транзакций ✅ Dirty Read — чтение незакоммиченных данных (PostgreSQL защищает по умолчанию) ✅ Isolation levels — контролируют видимость данных между транзакциями

Следующий урок

Теперь вы понимаете гонки в PostgreSQL. В следующем уроке добавляем Redis и настраиваем фикстуры для кеша.

Переходите к уроку 5: Фундамент Redis: кеш и базовые операции

Гонки в PostgreSQL: lost update и грязное чтение — Pytest: Борьба с Race Conditions в Async-коде — Potapov.me