Skip to main content
testing

Race Conditions в многопоточном коде: threading.Lock

Понимание race conditions на простых примерах с threading. Счетчик, корзина покупок, диагностика гонок. Для тех, кто не работает с async.

#threading#race-condition#concurrency#pytest#beginner

Этот материал был частью продвинутого курса по pytest, но был вынесен отдельно. Если вы работаете с многопоточным кодом (threading), но не с async/await — этот гайд для вас.

Для кого этот материал?

✅ Вы пишете многопоточный код (threading, concurrent.futures)
✅ Вы хотите понять race conditions на простых примерах
❌ Вы НЕ работаете с async/await (для async → основной курс)

Что такое race condition?

Race condition — ситуация когда результат зависит от порядка выполнения параллельных операций. Порядок непредсказуем → результат непредсказуем.

Простая аналогия:

Два человека одновременно открывают холодильник, видят что молока нет, идут в магазин и покупают молоко. Результат: два пакета молока вместо одного.

Воспроизводим race condition

Наивный счетчик

# counter.py
counter = 0
 
def increment():
    """НЕ потокобезопасная функция"""
    global counter
    # Искусственная задержка чтобы гонка проявилась
    current = counter
    import time
    time.sleep(0.0001)  # 0.1ms задержка
    counter = current + 1

Проблема: counter += 1 кажется атомарным, но это 3 операции:

  1. READ: прочитать counter из памяти
  2. ADD: прибавить 1
  3. WRITE: записать обратно

Гонка возникает когда два потока делают это одновременно:

Thread 1: READ counter (0)
Thread 2: READ counter (0)  ← оба прочитали 0!
Thread 1: ADD 1 (0 + 1 = 1)
Thread 2: ADD 1 (0 + 1 = 1)  ← оба считают 0 + 1
Thread 1: WRITE 1
Thread 2: WRITE 1  ← перезаписывает результат Thread 1!

Результат: counter = 1 (ожидали 2)

Тест с одним потоком (работает)

# tests/test_counter_single.py
from counter import counter, increment
 
def test_increment_single_thread():
    """Один поток — всё ОК"""
    global counter
    counter = 0
 
    for _ in range(100):
        increment()
 
    assert counter == 100  # ✅ PASS

Тест с несколькими потоками (гонка!)

# tests/test_counter_race.py
import threading
from counter import counter, increment
 
def test_increment_race_condition():
    """Несколько потоков — гонка!"""
    global counter
    counter = 0
 
    threads = []
    for _ in range(10):
        t = threading.Thread(target=lambda: [increment() for _ in range(10)])
        threads.append(t)
        t.start()
 
    for t in threads:
        t.join()
 
    # Ожидаем 10 потоков × 10 инкрементов = 100
    assert counter == 100  # ❌ FAIL!

Запускаем:

pytest tests/test_counter_race.py -v

Результат:

AssertionError: assert 73 == 100
# или 81, или 92... каждый раз разное!

Это flaky test — тест падает нестабильно.

Диагностика race conditions

Признак #1: Нестабильный результат

# Запускаем тест 10 раз
for i in {1..10}; do
  pytest tests/test_counter_race.py -v --tb=no -q
done

Результат:

PASSED (counter=100)
FAILED (counter=87)
PASSED (counter=100)
FAILED (counter=91)
...

Вывод: если результат меняется от запуска к запуску — это race condition.

Признак #2: Зависит от количества потоков

# 2 потока: 95% успешных запусков
# 10 потоков: 60% успешных
# 50 потоков: 20% успешных

Вывод: чем больше параллелизм, тем чаще проявляется гонка.

Признак #3: Зависит от timing

def increment_with_delay(delay=0):
    global counter
    current = counter
    time.sleep(delay)  # Искусственная задержка
    counter = current + 1
 
# delay=0: 80% успешных
# delay=0.001: 30% успешных  ← гонка проявляется чаще!

Исправление: threading.Lock

Правильная реализация:

# counter_safe.py
import threading
 
counter = 0
lock = threading.Lock()
 
def increment_safe():
    """Потокобезопасная функция"""
    global counter
    with lock:  # Только один поток может быть здесь
        current = counter
        import time
        time.sleep(0.0001)
        counter = current + 1

Как работает Lock:

Thread 1: lock.acquire() ✅ Получил lock
Thread 2: lock.acquire() ⏳ Ждёт...
Thread 1: counter += 1
Thread 1: lock.release()  ✅ Освободил lock
Thread 2: lock.acquire() ✅ Теперь Thread 2 получил lock
Thread 2: counter += 1
Thread 2: lock.release()

Тест с Lock:

import threading
from counter_safe import counter, increment_safe
 
def test_increment_with_lock():
    """Lock гарантирует правильный результат"""
    global counter
    counter = 0
 
    threads = []
    for _ in range(10):
        t = threading.Thread(target=lambda: [increment_safe() for _ in range(10)])
        threads.append(t)
        t.start()
 
    for t in threads:
        t.join()
 
    assert counter == 100  # ✅ PASS (всегда!)

Запускаем 10 раз:

for i in {1..10}; do
  pytest tests/test_counter_safe.py -v --tb=no -q
done

Результат: ✅ PASS (10/10) — стабильно!

Практика: корзина покупок

Реальный пример: два пользователя добавляют товары в корзину одновременно.

Наивная реализация (с гонкой)

# cart.py
class ShoppingCart:
    def __init__(self):
        self.items = []
 
    def add_item(self, item_id: str):
        """НЕ потокобезопасный метод"""
        # Проверяем что товара ещё нет
        if item_id not in [i["id"] for i in self.items]:
            import time
            time.sleep(0.001)  # Имитация задержки сети/БД
            self.items.append({"id": item_id, "quantity": 1})
        else:
            # Товар есть — увеличиваем количество
            for item in self.items:
                if item["id"] == item_id:
                    item["quantity"] += 1
 
    def get_item_count(self, item_id: str) -> int:
        """Сколько раз товар добавлен"""
        for item in self.items:
            if item["id"] == item_id:
                return item["quantity"]
        return 0

Тест (воспроизводим гонку)

# tests/test_cart_race.py
import threading
from cart import ShoppingCart
 
def test_concurrent_add_item():
    """Два потока добавляют один товар одновременно"""
    cart = ShoppingCart()
 
    def add_milk():
        cart.add_item("milk")
 
    # Два потока добавляют "milk" одновременно
    t1 = threading.Thread(target=add_milk)
    t2 = threading.Thread(target=add_milk)
 
    t1.start()
    t2.start()
    t1.join()
    t2.join()
 
    # Ожидаем quantity=2
    assert cart.get_item_count("milk") == 2  # ❌ FAIL!
    # Реальность: quantity=1 или даже 2 товара с quantity=1

Проблема:

Thread 1: проверяет "milk not in items" → True
Thread 2: проверяет "milk not in items" → True (оба видят пустую корзину!)
Thread 1: добавляет {"id": "milk", "quantity": 1}
Thread 2: добавляет {"id": "milk", "quantity": 1}  ← дубликат!

Результат: два товара "milk" в корзине вместо одного с quantity=2

Исправление с Lock

# cart_safe.py
import threading
 
class ShoppingCartSafe:
    def __init__(self):
        self.items = []
        self.lock = threading.Lock()
 
    def add_item(self, item_id: str):
        """Потокобезопасный метод"""
        with self.lock:  # Только один поток внутри
            if item_id not in [i["id"] for i in self.items]:
                import time
                time.sleep(0.001)
                self.items.append({"id": item_id, "quantity": 1})
            else:
                for item in self.items:
                    if item["id"] == item_id:
                        item["quantity"] += 1

Тест с Lock:

from cart_safe import ShoppingCartSafe
 
def test_concurrent_add_item_safe():
    cart = ShoppingCartSafe()
 
    def add_milk():
        cart.add_item("milk")
 
    t1 = threading.Thread(target=add_milk)
    t2 = threading.Thread(target=add_milk)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
 
    assert cart.get_item_count("milk") == 2  # ✅ PASS
    assert len(cart.items) == 1  # Один товар, не дубликат

Тестирование race conditions в pytest

Техника #1: Запускаем много раз

pip install pytest-repeat
pytest tests/test_race.py --count=100

Техника #2: Увеличиваем задержки

def increment():
    current = counter
    time.sleep(0.001)  # Больше задержка = чаще гонка
    counter = current + 1

Техника #3: Больше потоков

# Не 2, а 50 потоков
threads = [threading.Thread(target=increment) for _ in range(50)]

Типичные ошибки

Ошибка #1: Недостаточно итераций

# ❌ Запускаем 1 раз → может не проявиться
result = run_concurrent_operation()
assert result == expected

Исправление: используйте pytest-repeat:

pytest tests/test_race.py --count=100

Ошибка #2: Нет задержки

def increment():
    counter += 1  # ❌ Слишком быстро, гонка не проявляется

Исправление: добавьте time.sleep():

def increment():
    current = counter
    time.sleep(0.0001)  # Окно для гонки
    counter = current + 1

Ошибка #3: Мало потоков

# ❌ 2 потока — гонка редкая
threads = [threading.Thread(target=increment) for _ in range(2)]

Исправление: используйте 10-50 потоков.

Когда НЕ использовать threading

❌ Ваше приложение асинхронное (async/await)
❌ Вы работаете с asyncio, aiohttp, FastAPI
❌ У вас I/O-bound задачи (лучше async)
✅ CPU-bound задачи
✅ Legacy синхронный код
✅ Библиотеки без async поддержки

Альтернативы threading.Lock

1. concurrent.futures (проще)

from concurrent.futures import ThreadPoolExecutor
 
def process():
    # Ваша функция
    pass
 
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(process) for _ in range(100)]
    results = [f.result() for f in futures]

2. queue.Queue (для producer-consumer)

import queue
import threading
 
q = queue.Queue()
 
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        process(item)
        q.task_done()
 
# Запускаем workers
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
    t.start()
 
# Добавляем задачи
for item in items:
    q.put(item)
 
q.join()  # Ждем завершения всех задач

Следующий шаг: async

Если ваше приложение использует async/await, не используйте threading. Переходите к async-тестированию:

  1. Real Python: Async IO in Python
  2. Практикуйте 4+ часов с asyncio
  3. Затем: Pytest: Борьба с Race Conditions в Async-коде

Дополнительные материалы

Что вы узнали

  • Race condition — результат зависит от порядка операций
  • Flaky test — тест падает нестабильно из-за гонки
  • threading.Lock — исправляет гонки в многопоточном коде
  • Диагностика — нестабильный результат, зависит от потоков/timing
  • Тестирование — pytest-repeat, задержки, много потоков
Race Conditions в многопоточном коде: threading.Lock — Learning Center — Potapov.me