Гонки в PostgreSQL: lost update и грязное чтение
В этом уроке переходим от in-memory гонок (asyncio.Lock) к гонкам на уровне базы данных. Это самый частый источник production-багов в асинхронных приложениях.
Главная цель: научиться распознавать и фиксить race conditions в PostgreSQL через правильные транзакции и блокировки.
Проблема: Lost Update
Production кейс:
Два async worker'а одновременно обновляют счетчик просмотров статьи:
# Наивная реализация (с гонкой)
async def increment_views(article_id: str, db_connection):
# 1. Читаем текущее значение
result = await db_connection.fetchrow(
"SELECT views FROM articles WHERE id = $1",
article_id
)
current_views = result["views"]
# 2. Задержка (имитация логики)
await asyncio.sleep(0.01)
# 3. Записываем увеличенное значение
await db_connection.execute(
"UPDATE articles SET views = $1 WHERE id = $2",
current_views + 1,
article_id
)Гонка:
Worker 1: SELECT views → 100
Worker 2: SELECT views → 100 ← оба прочитали 100!
Worker 1: UPDATE views = 101
Worker 2: UPDATE views = 101 ← перезаписал результат Worker 1!
Результат: views = 101 (ожидали 102)Это называется Lost Update — одно обновление потерялось.
Воспроизводим в тесте
Подготовка:
-- schema.sql
CREATE TABLE IF NOT EXISTS articles (
id VARCHAR(36) PRIMARY KEY,
title VARCHAR(255) NOT NULL,
views INTEGER DEFAULT 0
);Async фикстура:
# conftest.py
import pytest
import asyncpg
@pytest.fixture
async def db_connection():
"""Async connection с rollback"""
conn = await asyncpg.connect(
"postgresql://postgres:testpass@localhost:5432/todo_test"
)
# Начинаем транзакцию
await conn.execute("BEGIN")
yield conn
# Откатываем после теста
await conn.execute("ROLLBACK")
await conn.close()
@pytest.fixture
async def article_id(db_connection):
"""Создаём тестовую статью"""
article_id = "test-article-123"
await db_connection.execute(
"INSERT INTO articles (id, title, views) VALUES ($1, $2, $3)",
article_id, "Test Article", 0
)
return article_idТест (воспроизводим гонку):
# tests/test_article_race.py
import pytest
import asyncio
async def increment_views_naive(article_id: str, conn):
"""НЕ thread-safe версия"""
result = await conn.fetchrow(
"SELECT views FROM articles WHERE id = $1",
article_id
)
current_views = result["views"]
# Задержка чтобы гонка проявилась
await asyncio.sleep(0.01)
await conn.execute(
"UPDATE articles SET views = $1 WHERE id = $2",
current_views + 1,
article_id
)
@pytest.mark.asyncio
async def test_lost_update(db_connection, article_id):
"""Воспроизводим Lost Update"""
# Запускаем 10 параллельных инкрементов
await asyncio.gather(*[
increment_views_naive(article_id, db_connection)
for _ in range(10)
])
# Ожидаем views = 10
result = await db_connection.fetchrow(
"SELECT views FROM articles WHERE id = $1",
article_id
)
assert result["views"] == 10 # ❌ FAIL! (views = 3-7)Запускаем:
pytest tests/test_article_race.py -vРезультат:
AssertionError: assert 6 == 10
# Или 4, или 8... каждый раз разное!Исправление #1: Атомарный UPDATE
Правильная реализация:
async def increment_views_atomic(article_id: str, conn):
"""Thread-safe версия через атомарный UPDATE"""
await conn.execute(
"UPDATE articles SET views = views + 1 WHERE id = $1",
article_id
)
# PostgreSQL гарантирует атомарность этой операцииПочему работает:
PostgreSQL выполняет UPDATE views = views + 1 атомарно:
- Блокирует строку
- Читает views
- Увеличивает на 1
- Записывает
- Освобождает блокировку
Тест с атомарным UPDATE:
@pytest.mark.asyncio
async def test_atomic_update(db_connection, article_id):
"""Атомарный UPDATE без гонки"""
await asyncio.gather(*[
increment_views_atomic(article_id, db_connection)
for _ in range(10)
])
result = await db_connection.fetchrow(
"SELECT views FROM articles WHERE id = $1",
article_id
)
assert result["views"] == 10 # ✅ PASS (всегда!)Исправление #2: SELECT FOR UPDATE
Если нужна более сложная логика (не просто +1), используем SELECT FOR UPDATE:
async def increment_views_with_logic(article_id: str, conn):
"""
Thread-safe версия с кастомной логикой.
Например: не инкрементируем если views > 1000.
"""
# SELECT FOR UPDATE блокирует строку для других транзакций
result = await conn.fetchrow(
"SELECT views FROM articles WHERE id = $1 FOR UPDATE",
article_id
)
current_views = result["views"]
# Кастомная логика
if current_views < 1000:
new_views = current_views + 1
else:
new_views = current_views # Не инкрементируем
await conn.execute(
"UPDATE articles SET views = $1 WHERE id = $2",
new_views,
article_id
)
# Блокировка снимается после commit транзакцииКак работает FOR UPDATE:
Worker 1: SELECT ... FOR UPDATE → блокирует строку
Worker 2: SELECT ... FOR UPDATE → ждёт освобождения ⏳
Worker 1: UPDATE, COMMIT → освобождает блокировку
Worker 2: теперь может прочитать актуальные данныеТест:
@pytest.mark.asyncio
async def test_select_for_update(db_connection, article_id):
"""SELECT FOR UPDATE гарантирует изоляцию"""
await asyncio.gather(*[
increment_views_with_logic(article_id, db_connection)
for _ in range(10)
])
result = await db_connection.fetchrow(
"SELECT views FROM articles WHERE id = $1",
article_id
)
assert result["views"] == 10 # ✅ PASSПроблема #2: Dirty Read
Ситуация: один worker читает данные, которые другой worker ещё не закоммитил (и может откатить).
Пример:
# Worker 1 (незакоммиченная транзакция)
await conn.execute("UPDATE articles SET views = 999 WHERE id = $1", article_id)
# НЕТ COMMIT!
# Worker 2 (читает)
result = await conn2.fetchrow("SELECT views FROM articles WHERE id = $1", article_id)
# Что прочитаем: 0 (старое) или 999 (новое незакоммиченное)?Зависит от isolation level:
READ UNCOMMITTED— прочитаем 999 (грязное чтение)READ COMMITTED(default) — прочитаем 0 (стабильное)REPEATABLE READ— прочитаем 0 + защита от phantom readsSERIALIZABLE— полная изоляция
Тест на Dirty Read:
@pytest.mark.asyncio
async def test_no_dirty_read():
"""PostgreSQL default isolation не допускает dirty reads"""
conn1 = await asyncpg.connect(...)
conn2 = await asyncpg.connect(...)
await conn1.execute("BEGIN")
await conn2.execute("BEGIN")
article_id = "test-article"
await conn1.execute("INSERT INTO articles (id, views) VALUES ($1, 0)", article_id)
# conn1: обновляем но НЕ коммитим
await conn1.execute("UPDATE articles SET views = 999 WHERE id = $1", article_id)
# conn2: пытаемся прочитать
result = await conn2.fetchrow("SELECT views FROM articles WHERE id = $1", article_id)
# PostgreSQL READ COMMITTED: conn2 НЕ видит незакоммиченные данные
assert result is None # ✅ Dirty read не произошёл
await conn1.execute("ROLLBACK")
await conn2.execute("ROLLBACK")
await conn1.close()
await conn2.close()Практика: система бронирования мест
Реальный кейс: два пользователя бронируют последнее место в кинотеатре.
Схема:
CREATE TABLE seats (
id SERIAL PRIMARY KEY,
seat_number VARCHAR(10) UNIQUE NOT NULL,
booked BOOLEAN DEFAULT FALSE,
booked_by VARCHAR(100)
);Наивная реализация (с гонкой):
async def book_seat_naive(seat_number: str, user: str, conn):
"""НЕ thread-safe версия"""
result = await conn.fetchrow(
"SELECT booked FROM seats WHERE seat_number = $1",
seat_number
)
if not result["booked"]:
await asyncio.sleep(0.01) # Имитация задержки
await conn.execute(
"UPDATE seats SET booked = TRUE, booked_by = $1 WHERE seat_number = $2",
user, seat_number
)
return True # Успешно забронировали
return False # Место занятоТест (гонка):
@pytest.mark.asyncio
async def test_double_booking(db_connection):
"""Два пользователя бронируют одно место"""
await db_connection.execute(
"INSERT INTO seats (seat_number, booked) VALUES ($1, FALSE)",
"A1"
)
results = await asyncio.gather(
book_seat_naive("A1", "Alice", db_connection),
book_seat_naive("A1", "Bob", db_connection)
)
# Оба должны получить True? НЕТ!
assert results.count(True) == 1 # ❌ FAIL! Оба получили TrueИсправление с SELECT FOR UPDATE:
async def book_seat_safe(seat_number: str, user: str, conn):
"""Thread-safe версия"""
result = await conn.fetchrow(
"SELECT booked FROM seats WHERE seat_number = $1 FOR UPDATE",
seat_number
)
if not result["booked"]:
await conn.execute(
"UPDATE seats SET booked = TRUE, booked_by = $1 WHERE seat_number = $2",
user, seat_number
)
return True
return FalseТест (без гонки):
@pytest.mark.asyncio
async def test_safe_booking(db_connection):
"""SELECT FOR UPDATE предотвращает двойное бронирование"""
await db_connection.execute(
"INSERT INTO seats (seat_number, booked) VALUES ($1, FALSE)",
"A1"
)
results = await asyncio.gather(
book_seat_safe("A1", "Alice", db_connection),
book_seat_safe("A1", "Bob", db_connection)
)
# Только один получил место
assert results.count(True) == 1 # ✅ PASS
assert results.count(False) == 1Типичные ошибки
Ошибка #1: Забыли FOR UPDATE
# ❌ Без FOR UPDATE → гонка
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1", item_id)
if result["stock"] > 0:
await conn.execute("UPDATE items SET stock = stock - 1 WHERE id = $1", item_id)Исправление:
# ✅ С FOR UPDATE
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)Ошибка #2: Использовали FOR UPDATE без транзакции
# ❌ FOR UPDATE работает только в транзакции!
conn = await asyncpg.connect(...) # Нет BEGIN!
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)Исправление:
await conn.execute("BEGIN")
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)
# ...
await conn.execute("COMMIT")Ошибка #3: Долгая логика под блокировкой
# ❌ Медленная логика держит блокировку
result = await conn.fetchrow("SELECT * FROM items WHERE id = $1 FOR UPDATE", item_id)
await send_email_notification() # 3 секунды!
await conn.execute("UPDATE ...")Проблема: другие worker'ы ждут 3 секунды.
Исправление: выполняйте медленную логику после COMMIT.
Что вы узнали
✅ Lost Update — гонка при UPDATE, фиксится атомарными операциями ✅ SELECT FOR UPDATE — блокирует строку для других транзакций ✅ Dirty Read — чтение незакоммиченных данных (PostgreSQL защищает по умолчанию) ✅ Isolation levels — контролируют видимость данных между транзакциями
Следующий урок
Теперь вы понимаете гонки в PostgreSQL. В следующем уроке добавляем Redis и настраиваем фикстуры для кеша.
Переходите к уроку 5: Фундамент Redis: кеш и базовые операции