Race Conditions в многопоточном коде: threading.Lock
Понимание race conditions на простых примерах с threading. Счетчик, корзина покупок, диагностика гонок. Для тех, кто не работает с async.
Table of Contents
Этот материал был частью продвинутого курса по pytest, но был вынесен отдельно. Если вы работаете с многопоточным кодом (threading), но не с async/await — этот гайд для вас.
Для кого этот материал?
Что такое race condition?
Race condition — ситуация когда результат зависит от порядка выполнения параллельных операций. Порядок непредсказуем → результат непредсказуем.
Простая аналогия:
Два человека одновременно открывают холодильник, видят что молока нет, идут в магазин и покупают молоко. Результат: два пакета молока вместо одного.
Воспроизводим race condition
Наивный счетчик
# counter.py
counter = 0
def increment():
"""НЕ потокобезопасная функция"""
global counter
# Искусственная задержка чтобы гонка проявилась
current = counter
import time
time.sleep(0.0001) # 0.1ms задержка
counter = current + 1Проблема: counter += 1 кажется атомарным, но это 3 операции:
- READ: прочитать
counterиз памяти - ADD: прибавить 1
- WRITE: записать обратно
Гонка возникает когда два потока делают это одновременно:
Thread 1: READ counter (0)
Thread 2: READ counter (0) ← оба прочитали 0!
Thread 1: ADD 1 (0 + 1 = 1)
Thread 2: ADD 1 (0 + 1 = 1) ← оба считают 0 + 1
Thread 1: WRITE 1
Thread 2: WRITE 1 ← перезаписывает результат Thread 1!
Результат: counter = 1 (ожидали 2)
Тест с одним потоком (работает)
# tests/test_counter_single.py
from counter import counter, increment
def test_increment_single_thread():
"""Один поток — всё ОК"""
global counter
counter = 0
for _ in range(100):
increment()
assert counter == 100 # ✅ PASSТест с несколькими потоками (гонка!)
# tests/test_counter_race.py
import threading
from counter import counter, increment
def test_increment_race_condition():
"""Несколько потоков — гонка!"""
global counter
counter = 0
threads = []
for _ in range(10):
t = threading.Thread(target=lambda: [increment() for _ in range(10)])
threads.append(t)
t.start()
for t in threads:
t.join()
# Ожидаем 10 потоков × 10 инкрементов = 100
assert counter == 100 # ❌ FAIL!Запускаем:
pytest tests/test_counter_race.py -vРезультат:
AssertionError: assert 73 == 100
# или 81, или 92... каждый раз разное!
Это flaky test — тест падает нестабильно.
Диагностика race conditions
Признак #1: Нестабильный результат
# Запускаем тест 10 раз
for i in {1..10}; do
pytest tests/test_counter_race.py -v --tb=no -q
doneРезультат:
PASSED (counter=100)
FAILED (counter=87)
PASSED (counter=100)
FAILED (counter=91)
...
Вывод: если результат меняется от запуска к запуску — это race condition.
Признак #2: Зависит от количества потоков
# 2 потока: 95% успешных запусков
# 10 потоков: 60% успешных
# 50 потоков: 20% успешныхВывод: чем больше параллелизм, тем чаще проявляется гонка.
Признак #3: Зависит от timing
def increment_with_delay(delay=0):
global counter
current = counter
time.sleep(delay) # Искусственная задержка
counter = current + 1
# delay=0: 80% успешных
# delay=0.001: 30% успешных ← гонка проявляется чаще!Исправление: threading.Lock
Правильная реализация:
# counter_safe.py
import threading
counter = 0
lock = threading.Lock()
def increment_safe():
"""Потокобезопасная функция"""
global counter
with lock: # Только один поток может быть здесь
current = counter
import time
time.sleep(0.0001)
counter = current + 1Как работает Lock:
Thread 1: lock.acquire() ✅ Получил lock
Thread 2: lock.acquire() ⏳ Ждёт...
Thread 1: counter += 1
Thread 1: lock.release() ✅ Освободил lock
Thread 2: lock.acquire() ✅ Теперь Thread 2 получил lock
Thread 2: counter += 1
Thread 2: lock.release()
Тест с Lock:
import threading
from counter_safe import counter, increment_safe
def test_increment_with_lock():
"""Lock гарантирует правильный результат"""
global counter
counter = 0
threads = []
for _ in range(10):
t = threading.Thread(target=lambda: [increment_safe() for _ in range(10)])
threads.append(t)
t.start()
for t in threads:
t.join()
assert counter == 100 # ✅ PASS (всегда!)Запускаем 10 раз:
for i in {1..10}; do
pytest tests/test_counter_safe.py -v --tb=no -q
doneРезультат: ✅ PASS (10/10) — стабильно!
Практика: корзина покупок
Реальный пример: два пользователя добавляют товары в корзину одновременно.
Наивная реализация (с гонкой)
# cart.py
class ShoppingCart:
def __init__(self):
self.items = []
def add_item(self, item_id: str):
"""НЕ потокобезопасный метод"""
# Проверяем что товара ещё нет
if item_id not in [i["id"] for i in self.items]:
import time
time.sleep(0.001) # Имитация задержки сети/БД
self.items.append({"id": item_id, "quantity": 1})
else:
# Товар есть — увеличиваем количество
for item in self.items:
if item["id"] == item_id:
item["quantity"] += 1
def get_item_count(self, item_id: str) -> int:
"""Сколько раз товар добавлен"""
for item in self.items:
if item["id"] == item_id:
return item["quantity"]
return 0Тест (воспроизводим гонку)
# tests/test_cart_race.py
import threading
from cart import ShoppingCart
def test_concurrent_add_item():
"""Два потока добавляют один товар одновременно"""
cart = ShoppingCart()
def add_milk():
cart.add_item("milk")
# Два потока добавляют "milk" одновременно
t1 = threading.Thread(target=add_milk)
t2 = threading.Thread(target=add_milk)
t1.start()
t2.start()
t1.join()
t2.join()
# Ожидаем quantity=2
assert cart.get_item_count("milk") == 2 # ❌ FAIL!
# Реальность: quantity=1 или даже 2 товара с quantity=1Проблема:
Thread 1: проверяет "milk not in items" → True
Thread 2: проверяет "milk not in items" → True (оба видят пустую корзину!)
Thread 1: добавляет {"id": "milk", "quantity": 1}
Thread 2: добавляет {"id": "milk", "quantity": 1} ← дубликат!
Результат: два товара "milk" в корзине вместо одного с quantity=2
Исправление с Lock
# cart_safe.py
import threading
class ShoppingCartSafe:
def __init__(self):
self.items = []
self.lock = threading.Lock()
def add_item(self, item_id: str):
"""Потокобезопасный метод"""
with self.lock: # Только один поток внутри
if item_id not in [i["id"] for i in self.items]:
import time
time.sleep(0.001)
self.items.append({"id": item_id, "quantity": 1})
else:
for item in self.items:
if item["id"] == item_id:
item["quantity"] += 1Тест с Lock:
from cart_safe import ShoppingCartSafe
def test_concurrent_add_item_safe():
cart = ShoppingCartSafe()
def add_milk():
cart.add_item("milk")
t1 = threading.Thread(target=add_milk)
t2 = threading.Thread(target=add_milk)
t1.start()
t2.start()
t1.join()
t2.join()
assert cart.get_item_count("milk") == 2 # ✅ PASS
assert len(cart.items) == 1 # Один товар, не дубликатТестирование race conditions в pytest
Техника #1: Запускаем много раз
pip install pytest-repeat
pytest tests/test_race.py --count=100Техника #2: Увеличиваем задержки
def increment():
current = counter
time.sleep(0.001) # Больше задержка = чаще гонка
counter = current + 1Техника #3: Больше потоков
# Не 2, а 50 потоков
threads = [threading.Thread(target=increment) for _ in range(50)]Типичные ошибки
Ошибка #1: Недостаточно итераций
# ❌ Запускаем 1 раз → может не проявиться
result = run_concurrent_operation()
assert result == expectedИсправление: используйте pytest-repeat:
pytest tests/test_race.py --count=100Ошибка #2: Нет задержки
def increment():
counter += 1 # ❌ Слишком быстро, гонка не проявляетсяИсправление: добавьте time.sleep():
def increment():
current = counter
time.sleep(0.0001) # Окно для гонки
counter = current + 1Ошибка #3: Мало потоков
# ❌ 2 потока — гонка редкая
threads = [threading.Thread(target=increment) for _ in range(2)]Исправление: используйте 10-50 потоков.
Когда НЕ использовать threading
Альтернативы threading.Lock
1. concurrent.futures (проще)
from concurrent.futures import ThreadPoolExecutor
def process():
# Ваша функция
pass
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process) for _ in range(100)]
results = [f.result() for f in futures]2. queue.Queue (для producer-consumer)
import queue
import threading
q = queue.Queue()
def worker():
while True:
item = q.get()
if item is None:
break
process(item)
q.task_done()
# Запускаем workers
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
# Добавляем задачи
for item in items:
q.put(item)
q.join() # Ждем завершения всех задачСледующий шаг: async
Если ваше приложение использует async/await, не используйте threading. Переходите к async-тестированию:
- Real Python: Async IO in Python
- Практикуйте 4+ часов с asyncio
- Затем: Pytest: Борьба с Race Conditions в Async-коде
Дополнительные материалы
Что вы узнали
- Race condition — результат зависит от порядка операций
- Flaky test — тест падает нестабильно из-за гонки
- threading.Lock — исправляет гонки в многопоточном коде
- Диагностика — нестабильный результат, зависит от потоков/timing
- Тестирование — pytest-repeat, задержки, много потоков