Перейти к содержимому

Async/await в Python: что происходит под капотом (и почему это важно знать)

Константин Потапов
35 min

Глубокое погружение в async/await Python: разбираем event loop, coroutines и futures на уровне байткода. Типичные ошибки blocking calls, debugging в production и честный ответ на вопрос «когда async — это overkill».

Три года назад я добавил async/await в проект "потому что это модно и быстро". Через месяц production падал каждые 3 часа с загадочным RuntimeError: Event loop is closed. CPU usage был выше, чем до миграции. А половина эндпоинтов висла по 30 секунд, потому что я не понимал разницу между await asyncio.sleep(1) и time.sleep(1).

Сегодня расскажу, что происходит под капотом async/await, чтобы вы не наступили на те же грабли. Без маркетинга "асинхронность делает всё быстрее". С честными кейсами, когда async — это костыль, а не решение.

🎯 TL;DR: Зачем читать эту статью

Вы прочитаете эту статью, если хотите:

  • Понимать, а не просто использовать async/await — разбор от байткода до event loop
  • Избегать классических ошибок: time.sleep() вместо asyncio.sleep(), requests вместо aiohttp
  • Диагностировать production проблемы: deadlock'и, утечки задач, блокирующие вызовы
  • Принимать решения: когда async — правильный выбор, а когда — overengineering
  • Получить практику: 15+ паттернов, антипаттернов и ready-to-use решений

Формат: Технический deep dive с примерами кода, диаграммами и честными кейсами из production.

Уровень: Middle+ разработчики, знакомые с базовым синтаксисом async/await.

Что такое async/await (и чем это НЕ является)

Async ≠ параллелизм

Главное заблуждение новичков (и моё на старте):

# ❌ Это НЕ сделает код быстрее автоматически
async def slow_computation():
    result = 0
    for i in range(10_000_000):  # CPU-bound операция
        result += i
    return result
 
# Этот код будет работать ТАКЖЕ медленно, как синхронный
await slow_computation()

Почему? Потому что async/await в Python — это кооперативная многозадачность (cooperative multitasking), а не параллельное выполнение. Код всё равно выполняется в одном потоке.

Аналогия: Представьте ресторан с одним поваром (event loop). Async позволяет ему начать готовить пасту, потом пока она варится — порезать салат, потом вернуться к пасте. Но в любой момент времени он делает только одну операцию. Параллелизм (multiprocessing) — это несколько поваров на кухне.

Что async/await РЕАЛЬНО делает

Async/await позволяет переключаться между задачами во время ожидания I/O:

import asyncio
import aiohttp
 
# ✅ Async имеет смысл для I/O-bound операций
async def fetch_user(user_id: int):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/users/{user_id}") as resp:
            return await resp.json()
 
# Пока ждём ответ от сервера №1, запрашиваем сервер №2
users = await asyncio.gather(
    fetch_user(1),  # Начинаем запрос 1
    fetch_user(2),  # Начинаем запрос 2 (не дожидаясь завершения запроса 1!)
    fetch_user(3),  # Начинаем запрос 3
)
 
# Синхронный аналог:
def fetch_user_sync(user_id: int):
    response = requests.get(f"https://api.example.com/users/{user_id}")
    return response.json()
 
# Выполняются последовательно: запрос 1 → ждём → запрос 2 → ждём → запрос 3
users = [fetch_user_sync(1), fetch_user_sync(2), fetch_user_sync(3)]

Визуализация разницы:

ПодходТаймлайн выполненияОбщее время
Синхронный[Запрос 1 ========] → [Запрос 2 ========] → [Запрос 3 ========]~300ms
Асинхронный[Запрос 1 ========]
[Запрос 2 ========]
[Запрос 3 ========]
~100ms

🚀 Performance Nugget: При трёх параллельных запросах async даёт 3x ускорение. При 10 запросах — 10x. При 100 — до 100x. Это работает только для I/O-bound операций!

Под капотом: от генераторов к async/await

Исторический контекст: async до async/await

До Python 3.5 асинхронность строилась на генераторах и yield from:

# Python 3.4 (asyncio с генераторами)
@asyncio.coroutine
def fetch_data():
    response = yield from aiohttp.request('GET', 'https://api.example.com')
    data = yield from response.json()
    return data

Это работало, но выглядело как костыль. Python 3.5 добавил синтаксический сахар async/await:

# Python 3.5+ (современный синтаксис)
async def fetch_data():
    response = await aiohttp.request('GET', 'https://api.example.com')
    data = await response.json()
    return data

Важно: Под капотом это всё ещё генераторы! async def создаёт coroutine object, который работает по принципу генераторов.

Что происходит при вызове async функции

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")
 
# Вызов async функции НЕ выполняет её код!
coro = hello()
print(type(coro))  # <class 'coroutine'>
 
# Код выполняется только при await
await coro  # Hello → пауза 1 сек → World

Под капотом:

# Упрощённый псевдокод того, что делает интерпретатор
def hello():
    """Генераторная версия async def hello()."""
    print("Hello")
    yield asyncio.sleep(1)  # yield — точка приостановки
    print("World")
 
# При вызове async функции:
# 1. Python создаёт coroutine object (аналог генератора)
# 2. Сохраняет состояние функции (локальные переменные, позицию выполнения)
# 3. Возвращает объект, который можно await'ить
 
coro = hello()  # Ничего не выполняется, только создаётся объект

Байткод async функции

Посмотрим, что происходит на уровне байткода:

import dis
 
async def example():
    await asyncio.sleep(1)
 
dis.dis(example)

Вывод (упрощённо):

  2           0 LOAD_GLOBAL              0 (asyncio)
              2 LOAD_METHOD              1 (sleep)
              4 LOAD_CONST               1 (1)
              6 CALL_METHOD              1
              8 GET_AWAITABLE            0  ← Ключевая инструкция!
             10 LOAD_CONST               0 (None)
             12 YIELD_FROM                  ← Генератор под капотом!
             14 POP_TOP
             16 LOAD_CONST               0 (None)
             18 RETURN_VALUE

GET_AWAITABLE и YIELD_FROM — доказательство, что async/await использует механизм генераторов.

Event Loop: дирижёр асинхронного оркестра

Что такое event loop

Event loop — это бесконечный цикл, который:

  1. Проверяет, какие задачи готовы к выполнению
  2. Выполняет готовые задачи до следующего await
  3. Переключается на другую задачу, если текущая ждёт I/O
  4. Повторяет цикл

Упрощённая реализация event loop:

class SimpleEventLoop:
    def __init__(self):
        self.tasks = []  # Очередь задач
        self.ready = []  # Готовые к выполнению задачи
 
    def create_task(self, coro):
        """Добавляет корутину в очередь."""
 
        task = Task(coro)
        self.tasks.append(task)
        return task
 
    def run_until_complete(self, coro):
        """Запускает event loop до завершения корутины."""
 
        task = self.create_task(coro)
 
        while not task.done():
            # 1. Собираем задачи, готовые к выполнению
            self.ready = [t for t in self.tasks if t.can_run()]
 
            # 2. Выполняем готовые задачи
            for task in self.ready:
                try:
                    # Делаем шаг выполнения (до следующего await)
                    task.step()
                except StopIteration:
                    # Корутина завершилась
                    task.set_done()
 
            # 3. Проверяем I/O события (сокеты, таймеры)
            self._poll_io()
 
        return task.result()
 
    def _poll_io(self):
        """Проверяет завершённые I/O операции."""
 
        # В реальном asyncio здесь select/epoll/kqueue
        # Для упрощения — просто пауза
        pass

Как работает await

async def main():
    print("Start")
    await asyncio.sleep(1)  # ← Что здесь происходит?
    print("End")

Под капотом:

# 1. asyncio.sleep(1) возвращает Future объект
future = asyncio.sleep(1)
 
# 2. await передаёт управление event loop'у
# "Я жду завершения future, можешь выполнять другие задачи"
# Event loop переключается на следующую задачу
 
# 3. Через 1 секунду event loop помечает future как завершённый
# И возвращает управление в точку await
 
# 4. Код продолжается после await
print("End")

Визуализация:

Создание и запуск event loop

import asyncio
 
# Вариант 1: asyncio.run() (Python 3.7+)
asyncio.run(main())  # Создаёт loop → выполняет → закрывает
 
# Вариант 2: Ручное управление (legacy)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
 
# Вариант 3: Уже внутри async функции
async def inside_async():
    loop = asyncio.get_running_loop()  # Получить текущий loop
    # НЕ создавайте новый loop внутри async функции!

⚠️ Распространённая ошибка:

async def broken():
    asyncio.run(some_coro())  # ❌ RuntimeError: asyncio.run() cannot be called from a running event loop

Вы уже внутри event loop! Используйте await some_coro() вместо asyncio.run().

Корутины, Tasks и Futures: в чём разница

Coroutine (корутина)

async def fetch_user(user_id: int):
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}"}
 
# Вызов async функции создаёт coroutine object
coro = fetch_user(1)
print(type(coro))  # <class 'coroutine'>
 
# Корутина НЕ выполняется до await/create_task
await coro  # ← Вот здесь выполняется

Корутина — это объект, представляющий отложенное выполнение async функции.

Task (задача)

# Task оборачивает корутину для запуска в event loop
task = asyncio.create_task(fetch_user(1))
 
# Задача начинает выполняться СРАЗУ (в фоне)
# Можно сделать другую работу
print("Задача выполняется в фоне...")
 
# Дождаться результата
result = await task

Task — это обёртка над корутиной, которая запускается в event loop немедленно. Можно отменять, отслеживать статус, получать результат.

Разница:

# Корутина НЕ выполняется до await
coro = fetch_user(1)
await asyncio.sleep(5)  # fetch_user ещё не начал выполняться
result = await coro     # Только тут начинается выполнение
 
# Task выполняется СРАЗУ
task = asyncio.create_task(fetch_user(1))
await asyncio.sleep(5)  # fetch_user уже выполняется в фоне!
result = await task     # Просто получаем результат

Future (будущий результат)

# Future — это низкоуровневая абстракция для результата, который будет позже
future = asyncio.Future()
 
# Future в состоянии "pending"
print(future.done())  # False
 
# Устанавливаем результат (обычно делает event loop)
future.set_result(42)
 
# Future в состоянии "done"
print(future.done())  # True
result = await future  # 42

Future — это "обещание" (promise), что значение будет доступно в будущем. Task наследуется от Future.

Иерархия:

Future (базовый класс)
  └── Task (Future + управление корутиной)

Практический пример: разница в поведении

import asyncio
import time
 
async def slow_operation(n):
    print(f"[{time.time():.2f}] Start {n}")
    await asyncio.sleep(1)
    print(f"[{time.time():.2f}] End {n}")
    return n * 2
 
async def example_coroutines():
    """Корутины: выполняются последовательно."""
 
    print("=== Coroutines ===")
    result1 = await slow_operation(1)  # Ждём завершения
    result2 = await slow_operation(2)  # Потом второй
    return [result1, result2]
 
async def example_tasks():
    """Tasks: выполняются параллельно."""
 
    print("=== Tasks ===")
    task1 = asyncio.create_task(slow_operation(1))  # Запускаем сразу
    task2 = asyncio.create_task(slow_operation(2))  # Запускаем сразу
 
    # Оба выполняются одновременно
    result1 = await task1
    result2 = await task2
    return [result1, result2]
 
# Корутины: ~2 секунды (последовательно)
await example_coroutines()
# [1.00] Start 1
# [2.00] End 1
# [2.00] Start 2
# [3.00] End 2
 
# Tasks: ~1 секунда (параллельно)
await example_tasks()
# [1.00] Start 1
# [1.00] Start 2  ← Запустились одновременно!
# [2.00] End 1
# [2.00] End 2

Типичные ошибки: blocking calls в async коде

Ошибка №1: Использование синхронных библиотек

import asyncio
import requests  # ❌ Синхронная библиотека!
import time
 
async def fetch_users_broken():
    """Плохой пример: блокирует event loop."""
    users = []
    for user_id in range(10):
        # requests.get() блокирует весь event loop!
        response = requests.get(f"https://api.example.com/users/{user_id}")
        users.append(response.json())
    return users
 
# Пока выполняется requests.get(), event loop не может переключиться на другие задачи
# Весь async код стоит и ждёт

Почему это плохо:

async def main():
    # Создаём 100 задач
    tasks = [asyncio.create_task(fetch_users_broken()) for _ in range(100)]
 
    # Ожидаем завершения всех
    await asyncio.gather(*tasks)
 
# Ожидаемо: 100 задач выполняются параллельно → ~1 секунда
# Реально: 100 задач выполняются последовательно → ~100 секунд
# Потому что requests.get() блокирует event loop!

Правильно:

import aiohttp  # ✅ Асинхронная библиотека
 
async def fetch_users_correct():
    """Правильный пример: не блокирует event loop."""
 
    users = []
    async with aiohttp.ClientSession() as session:
        for user_id in range(10):
            # aiohttp поддерживает await — event loop может переключаться
            async with session.get(f"https://api.example.com/users/{user_id}") as resp:
                users.append(await resp.json())
    return users

Ошибка №2: time.sleep() вместо asyncio.sleep()

async def broken_delay():
    print("Start")
    time.sleep(1)  # ❌ Блокирует event loop на 1 секунду!
    print("End")
 
async def correct_delay():
    print("Start")
    await asyncio.sleep(1)  # ✅ Передаёт управление event loop'у
    print("End")

Демонстрация проблемы:

async def test_blocking():
    tasks = [
        asyncio.create_task(broken_delay()),
        asyncio.create_task(broken_delay()),
    ]
    await asyncio.gather(*tasks)
 
# Вывод:
# Start
# (пауза 1 сек — весь event loop заблокирован!)
# End
# Start
# (пауза 1 сек)
# End
# Итого: 2 секунды (последовательно)
 
async def test_non_blocking():
    tasks = [
        asyncio.create_task(correct_delay()),
        asyncio.create_task(correct_delay()),
    ]
    await asyncio.gather(*tasks)
 
# Вывод:
# Start
# Start
# (пауза 1 сек — обе задачи ждут параллельно)
# End
# End
# Итого: 1 секунда (параллельно)

Ошибка №3: Блокирующие операции с БД

import asyncio
import psycopg2  # ❌ Синхронный драйвер PostgreSQL
 
async def fetch_users_db_broken():
    """Плохо: блокирует event loop."""
 
    conn = psycopg2.connect("dbname=test")  # Блокирующее подключение
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")  # Блокирующий запрос
    users = cursor.fetchall()
    conn.close()
    return users

Правильно:

import asyncpg  # ✅ Асинхронный драйвер PostgreSQL
 
async def fetch_users_db_correct():
    """Правильно: не блокирует event loop."""
 
    conn = await asyncpg.connect("postgresql://localhost/test")
    users = await conn.fetch("SELECT * FROM users")
    await conn.close()
    return users

Ошибка №4: CPU-bound операции в async коде

async def process_data_broken(data):
    """Плохо: CPU-bound задача блокирует event loop."""
 
    result = []
    for item in data:
        # Тяжёлые вычисления (например, обработка изображений)
        processed = expensive_computation(item)  # Блокирует!
        result.append(processed)
    return result

Правильно: вынести в отдельный поток/процесс

import asyncio
from typing import Any
from concurrent.futures import ProcessPoolExecutor
 
# ✅ Вариант 1: asyncio.to_thread() — для I/O-bound блокирующих операций (Python 3.9+)
async def process_data_io_bound(data: list[Any]) -> list[Any]:
    """Современный способ для блокирующих I/O операций."""
 
    result = []
    for item in data:
        # Запускаем блокирующую функцию в отдельном потоке
        processed = await asyncio.to_thread(blocking_io_operation, item)
        result.append(processed)
    return result
 
def blocking_io_operation(item: Any) -> Any:
    """Синхронная функция с блокирующим I/O (например, requests.get)."""
 
    import time
    time.sleep(0.1)  # Имитация блокирующего I/O
    return item * 2
 
# ✅ Вариант 2: ProcessPoolExecutor — для CPU-bound операций
async def process_data_cpu_bound(data: list[Any]) -> list[Any]:
    """CPU-bound задача в отдельном процессе."""
 
    loop = asyncio.get_running_loop()
 
    # Выполняем в отдельном процессе
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            expensive_computation_batch,  # Синхронная функция
            data
        )
    return result
 
def expensive_computation_batch(data: list[Any]) -> list[Any]:
    """Синхронная функция для выполнения в отдельном процессе."""
 
    return [expensive_computation(item) for item in data]
 
def expensive_computation(item: Any) -> Any:
    """Тяжёлые вычисления (CPU-bound)."""
 
    result = 0
    for i in range(1_000_000):
        result += i
    return result + item

💡 Pro Tip: Когда использовать что?

  • asyncio.to_thread() — для блокирующих I/O операций (requests, psycopg2, file I/O). Легковесно, использует ThreadPoolExecutor.
  • ProcessPoolExecutor — для CPU-bound задач (обработка изображений, математические вычисления). Реальный параллелизм.
  • Async библиотеки (aiohttp, asyncpg) — всегда лучше, если доступны. Не блокируют event loop вообще.

Как обнаружить blocking calls

Инструмент №1: aiodebug

import asyncio
import aiodebug
 
# Предупреждает, если какая-то операция блокирует event loop дольше 0.1 сек
aiodebug.log_slow_callbacks.enable(0.1)
 
async def main():
    time.sleep(0.2)  # ⚠️ Warning: Executing <function main> took 0.200 seconds
 
asyncio.run(main())

Инструмент №2: asyncio debug mode

import asyncio
import logging
 
# Включаем debug mode
asyncio.run(main(), debug=True)
 
# Или через переменную окружения
# PYTHONASYNCIODEBUG=1 python script.py

Что показывает debug mode:

  • Корутины, которые не были await'нуты (забытые)
  • Блокирующие вызовы, которые долго выполняются
  • Задачи, которые не были awaited

Инструмент №3: profiling

import asyncio
import cProfile
 
async def main():
    await some_async_function()
 
# Профилирование async кода
cProfile.run('asyncio.run(main())')

Debugging async кода в production

Проблема №1: "Event loop is closed"

# ❌ Частая ошибка при тестировании
import asyncio
import pytest
 
def test_async_function():
    result = asyncio.run(fetch_data())
    assert result == expected
 
    # Второй вызов
    result2 = asyncio.run(fetch_data())  # RuntimeError: Event loop is closed

Почему происходит:

asyncio.run() создаёт event loop → выполняет корутину → закрывает loop. Повторный вызов пытается использовать закрытый loop.

Решение:

# ✅ Используйте pytest-asyncio
import pytest
 
@pytest.mark.asyncio
async def test_async_function():
    result = await fetch_data()
    assert result == expected
 
    result2 = await fetch_data()  # Работает!

Проблема №2: Корутина не была awaited

async def fetch_data():
    await asyncio.sleep(0.1)
    return "data"
 
async def main():
    # ❌ Забыли await
    result = fetch_data()  # Создаёт coroutine object, но не выполняет!
    print(result)  # <coroutine object fetch_data at 0x...>
 
# Warning: coroutine 'fetch_data' was never awaited

Правильно:

async def main():
    result = await fetch_data()  # ✅
    print(result)  # "data"

Проблема №3: Deadlock из-за неправильного порядка await

async def task_a():
    print("A: waiting for B")
    await some_event.wait()  # Ждёт, пока task_b установит событие
    print("A: done")
 
async def task_b():
    print("B: waiting for A")
    await another_event.wait()  # Ждёт, пока task_a установит событие
    print("B: done")
 
# ❌ Deadlock: обе задачи ждут друг друга
await asyncio.gather(task_a(), task_b())

Debugging:

import asyncio
 
# Печатаем все запущенные задачи
async def debug_tasks():
    while True:
        tasks = asyncio.all_tasks()
        print(f"Running tasks: {len(tasks)}")
        for task in tasks:
            print(f"  - {task.get_name()}: {task.get_coro()}")
        await asyncio.sleep(5)
 
# Запускаем параллельно с основным кодом
asyncio.create_task(debug_tasks())

Проблема №4: Утечка задач (task leak)

async def create_background_task():
    # ❌ Задача создана, но никто не ждёт её завершения
    asyncio.create_task(background_worker())
    # При завершении программы задача будет убита
 
async def background_worker():
    while True:
        await process_queue()
        await asyncio.sleep(1)

Правильно:

async def create_background_task():
    # ✅ Сохраняем reference на задачу
    task = asyncio.create_task(background_worker())
 
    try:
        await main_logic()
    finally:
        # Отменяем background задачу при завершении
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

Инструменты для debugging в production

1. aiodebug + structlog

import asyncio
import structlog
from aiodebug import log_slow_callbacks
 
# Логируем медленные callback'и
log_slow_callbacks.enable(0.1)
 
logger = structlog.get_logger()
 
async def monitored_function():
    logger.info("started", function="monitored_function")
    await slow_operation()
    logger.info("completed", function="monitored_function")

2. aiomonitor — интерактивный debugger

import asyncio
from aiomonitor import start_monitor
 
async def main():
    # Запускаем monitor на порту 50101
    with start_monitor(loop=asyncio.get_running_loop()):
        await application_code()
 
# В другом терминале:
# $ nc localhost 50101
# > ps  # Список запущенных задач
# > where <task_id>  # Stacktrace задачи
# > cancel <task_id>  # Отменить задачу

3. Prometheus метрики

from prometheus_client import Histogram, Counter
import asyncio
 
# Метрики
task_duration = Histogram(
    'async_task_duration_seconds',
    'Time spent in async task',
    ['task_name']
)
 
task_errors = Counter(
    'async_task_errors_total',
    'Total errors in async tasks',
    ['task_name', 'error_type']
)
 
async def monitored_task(name: str):
    with task_duration.labels(task_name=name).time():
        try:
            await actual_work()
        except Exception as e:
            task_errors.labels(
                task_name=name,
                error_type=type(e).__name__
            ).inc()
            raise

4. OpenTelemetry для distributed tracing

from opentelemetry import trace
from opentelemetry.instrumentation.aiohttp import AioHttpClientInstrumentor
 
# Автоматическая инструментация aiohttp
AioHttpClientInstrumentor().instrument()
 
tracer = trace.get_tracer(__name__)
 
async def fetch_user(user_id: int):
    # Создаём span для трейсинга
    with tracer.start_as_current_span("fetch_user") as span:
        span.set_attribute("user_id", user_id)
 
        async with aiohttp.ClientSession() as session:
            async with session.get(f"/users/{user_id}") as resp:
                # Автоматически добавляет trace_id в заголовки
                return await resp.json()

Когда async — это overkill (честный разбор)

Сценарий №1: Простой CRUD API с малой нагрузкой

# ❌ Overkill для простого API
from fastapi import FastAPI
import asyncpg
 
app = FastAPI()
 
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    conn = await asyncpg.connect("postgresql://localhost/test")
    user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
    await conn.close()
    return user
 
# Нагрузка: 10 req/sec
# Сложность: высокая (async DB driver, async dependencies)
# Выгода: практически нет

Правильно для малой нагрузки:

# ✅ Простой синхронный код
from flask import Flask
import psycopg2
 
app = Flask(__name__)
 
@app.route("/users/<int:user_id>")
def get_user(user_id):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    user = cursor.fetchone()
    conn.close()
    return user
 
# Нагрузка: 10 req/sec
# Сложность: низкая (знакомые инструменты)
# Выгода: простота поддержки

Когда переходить на async:

  • Нагрузка >100 req/sec с I/O операциями
  • Много параллельных внешних запросов (API, БД)
  • WebSocket соединения

Сценарий №2: CPU-bound задачи

# ❌ Async не поможет для CPU-bound
async def process_images_broken(images):
    """НЕ станет быстрее от async/await."""
 
    results = []
    for image in images:
        # CPU-bound операция — блокирует event loop
        processed = apply_filters(image)  # Тяжёлые вычисления
        results.append(processed)
    return results
 
# Время выполнения: ~10 секунд для 100 изображений

Правильно: используйте multiprocessing

# ✅ Параллельная обработка в нескольких процессах
from multiprocessing import Pool
 
def process_images_correct(images):
    """Реальный параллелизм для CPU-bound."""
 
    with Pool(processes=4) as pool:
        results = pool.map(apply_filters, images)
    return results
 
# Время выполнения: ~2.5 секунды (4x ускорение на 4-ядерном CPU)

Hybrid подход для I/O + CPU:

import asyncio
from concurrent.futures import ProcessPoolExecutor
 
async def download_and_process_images(urls):
    """I/O (скачивание) — async, CPU (обработка) — multiprocessing."""
 
    # 1. Скачиваем параллельно (async)
    images = await asyncio.gather(*[download_image(url) for url in urls])
 
    # 2. Обрабатываем в отдельных процессах (multiprocessing)
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        results = await loop.run_in_executor(
            pool,
            process_images_batch,
            images
        )
 
    return results

Сценарий №3: Legacy код с синхронными зависимостями

# ❌ Мучительная миграция
# У вас есть большая кодовая база на Flask/Django с синхронными библиотеками
 
import requests
import psycopg2
from old_library import SyncClient  # Синхронная библиотека без async версии
 
# Попытка добавить async:
async def new_endpoint():
    # Нужно либо:
    # 1. Заменить ВСЕ зависимости на async версии
    # 2. Или использовать run_in_executor() везде (костыль)
 
    loop = asyncio.get_running_loop()
 
    # Каждый синхронный вызов — отдельный executor
    response = await loop.run_in_executor(None, requests.get, url)
    db_result = await loop.run_in_executor(None, db_query, query)
    client_result = await loop.run_in_executor(None, SyncClient().fetch, data)
 
    # Код превращается в кашу

Когда async НЕ стоит того:

  • 50% кодовой базы — синхронные библиотеки

  • Нет async версий критичных зависимостей
  • Команда не знакома с async (learning curve высокий)

Альтернатива:

# ✅ Масштабируйте горизонтально
# Вместо миграции на async — запустите больше синхронных воркеров
 
# Gunicorn с 4 процессами
gunicorn app:app --workers 4
 
# Или используйте балансировщик нагрузки
# Nginx → 4 инстанса Flask приложения

Сценарий №4: Скрипты и CLI инструменты

# ❌ Overkill для простого скрипта
import asyncio
import aiohttp
 
async def fetch_all_users():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user(session, i) for i in range(100)]
        users = await asyncio.gather(*tasks)
    return users
 
async def fetch_user(session, user_id):
    async with session.get(f"/users/{user_id}") as resp:
        return await resp.json()
 
# Запускается один раз в день через cron
asyncio.run(fetch_all_users())

Правильно для одноразовых скриптов:

# ✅ Простой синхронный код
import requests
from concurrent.futures import ThreadPoolExecutor
 
def fetch_all_users():
    with ThreadPoolExecutor(max_workers=10) as executor:
        users = list(executor.map(fetch_user, range(100)))
    return users
 
def fetch_user(user_id):
    response = requests.get(f"/users/{user_id}")
    return response.json()
 
# Проще понять, проще отлаживать

Когда async действительно нужен

Используйте async когда:

  1. High-load I/O-bound приложения

    • Web API с >100 req/sec
    • WebSocket сервера с тысячами соединений
    • Микросервисы с множеством внешних запросов
  2. Много параллельных I/O операций

    • Агрегация данных из 10+ API
    • Batch обработка с HTTP запросами
    • Работа с очередями сообщений (Kafka, RabbitMQ)
  3. Realtime приложения

    • Chat серверы
    • Streaming данных
    • Live обновления через Server-Sent Events

НЕ используйте async когда:

  1. CPU-bound задачи (используйте multiprocessing)
  2. Малая нагрузка (<50 req/sec)
  3. Legacy код с синхронными зависимостями
  4. Простые скрипты и CLI инструменты
  5. Команда не готова к async (обучение + debugging дороже выгоды)

Практические паттерны

Паттерн №1: Semaphore для ограничения конкурентности

import asyncio
 
async def fetch_url(session, url, semaphore):
    # Ограничиваем количество одновременных запросов
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()
 
async def fetch_all(urls):
    # Максимум 10 одновременных запросов
    semaphore = asyncio.Semaphore(10)
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
 
    return results
 
# Без semaphore: 1000 одновременных соединений → сервер упадёт
# С semaphore: максимум 10 соединений → стабильная работа

Паттерн №2: Retry с exponential backoff

import asyncio
from typing import TypeVar, Callable
 
T = TypeVar('T')
 
async def retry_with_backoff(
    func: Callable[..., T],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    *args,
    **kwargs
) -> T:
    """Повторяет async функцию с экспоненциальной задержкой."""
 
    delay = initial_delay
 
    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
 
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            await asyncio.sleep(delay)
            delay *= 2  # Экспоненциальный рост задержки
 
# Использование
result = await retry_with_backoff(
    fetch_user,
    max_retries=5,
    initial_delay=1.0,
    user_id=123
)

Паттерн №3: Timeout для защиты от зависаний

import asyncio
 
async def fetch_with_timeout(url, timeout=5.0):
    try:
        async with asyncio.timeout(timeout):  # Python 3.11+
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.json()
    except asyncio.TimeoutError:
        print(f"Request to {url} timed out after {timeout}s")
        return None
 
# Python 3.10 и ранее:
async def fetch_with_timeout_legacy(url, timeout=5.0):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await asyncio.wait_for(resp.json(), timeout=timeout)
    except asyncio.TimeoutError:
        print(f"Request to {url} timed out")
        return None

Паттерн №4: Graceful shutdown

import asyncio
import signal
 
class Application:
    def __init__(self):
        self.tasks = []
        self.shutdown_event = asyncio.Event()
 
    async def background_worker(self, name: str):
        """Background задача, которая корректно завершается."""
 
        print(f"Worker {name} started")
 
        try:
            while not self.shutdown_event.is_set():
                await self.do_work()
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print(f"Worker {name} cancelled")
            raise
        finally:
            print(f"Worker {name} cleanup")
 
    async def do_work(self):
        # Основная работа
        pass
 
    async def start(self):
        """Запускает приложение."""
 
        # Создаём background задачи
        self.tasks = [
            asyncio.create_task(self.background_worker(f"worker-{i}"))
            for i in range(5)
        ]
 
        # Ждём сигнала завершения
        await self.shutdown_event.wait()
 
        # Graceful shutdown
        await self.shutdown()
 
    async def shutdown(self):
        """Корректное завершение всех задач."""
 
        print("Shutting down...")
 
        # Отменяем все задачи
        for task in self.tasks:
            task.cancel()
 
        # Ждём завершения (обрабатываем CancelledError)
        results = await asyncio.gather(*self.tasks, return_exceptions=True)
 
        # Проверяем результаты
        for i, result in enumerate(results):
            if isinstance(result, asyncio.CancelledError):
                print(f"Task {i} was cancelled")
            elif isinstance(result, Exception):
                print(f"Task {i} failed: {result}")
 
        print("Shutdown complete")
 
    def handle_signal(self):
        """Обработчик SIGINT/SIGTERM."""
 
        print("Received shutdown signal")
        self.shutdown_event.set()
 
async def main():
    app = Application()
 
    # Регистрируем обработчики сигналов
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, app.handle_signal)
 
    await app.start()
 
asyncio.run(main())

📚 Шпаргалка: Async/Await в одной таблице

Ключевые концепции

  1. Async ≠ параллелизм — это кооперативная многозадачность в одном потоке
  2. await — точка, где event loop может переключиться на другую задачу
  3. Coroutine — отложенное выполнение, Task — запущенная задача, Future — обещание результата
  4. Event loop — дирижёр, который управляет выполнением задач

Частые ошибки и решения

❌ Проблема💥 Последствие✅ Решение🔧 Код
time.sleep() в async кодеБлокирует event loopasyncio.sleep()await asyncio.sleep(1)
Синхронные библиотеки (requests)Блокирует весь async кодAsync версииaiohttp, httpx, asyncpg
Забытый awaitКорутина не выполняетсяDebug modePYTHONASYNCIODEBUG=1
CPU-bound в asyncБлокирует event loopasyncio.to_thread() или ProcessPoolExecutorawait asyncio.to_thread(func)
asyncio.run() внутри asyncRuntimeErrorИспользуйте awaitawait coro() вместо asyncio.run(coro())
Blocking DB callsEvent loop простаиваетAsync драйверыasyncpg, motor, aiosqlite

Шпаргалка по библиотекам

Задача❌ Синхронная (блокирует)✅ Асинхронная📦 Установка
HTTP-клиентrequestsaiohttp или httpxpip install aiohttp
PostgreSQLpsycopg2asyncpgpip install asyncpg
MySQLpymysqlaiomysqlpip install aiomysql
MongoDBpymongomotorpip install motor
Redisredisaioredispip install aioredis
SQLitesqlite3aiosqlitepip install aiosqlite
File I/Oopen(), read()aiofilespip install aiofiles
Блокирующий кодЛюбая sync функцияasyncio.to_thread()Встроено (Python 3.9+)

Когда использовать async

Сценарий✅ Async❌ SyncПочему?
Web API (>100 req/s)Много параллельных I/O операций
WebSocket серверТысячи одновременных соединений
Микросервисы с внешними APIМножество HTTP запросов
Batch обработка данныхCPU-bound, нужен multiprocessing
Простой CRUD API (<50 req/s)Overhead async не окупается
Legacy проектМиграция дороже выгоды
CLI инструментыПростота важнее производительности

Production Checklist

Перед deploy в production:

  • Включён asyncio debug mode на staging: PYTHONASYNCIODEBUG=1
  • Используется aiodebug для обнаружения blocking calls (>100ms)
  • Добавлен мониторинг метрик: latency, throughput, event loop lag
  • Настроен distributed tracing (OpenTelemetry) для async задач
  • Реализован graceful shutdown с отменой задач
  • Добавлены timeout'ы для всех внешних запросов (5-30s)
  • Настроен Semaphore для ограничения конкурентности
  • Проверено отсутствие requests, psycopg2, time.sleep() в async коде
  • Добавлен retry с exponential backoff для нестабильных API
  • Настроены health-check эндпоинты для проверки event loop

Debugging в production

# 1. Включить debug mode
import asyncio
asyncio.run(main(), debug=True)
 
# 2. Логировать медленные callbacks
from aiodebug import log_slow_callbacks
log_slow_callbacks.enable(0.1)  # Предупреждение если >100ms
 
# 3. Мониторить запущенные задачи
tasks = asyncio.all_tasks()
print(f"Running: {len(tasks)} tasks")
 
# 4. Проверить stack trace задачи
for task in tasks:
    print(task.get_stack())

🎯 Ваши следующие шаги

Вы прочитали 25 минут технического контента. Теперь превратите знания в практику:

Шаг 1: Аудит текущего кода (15 минут)

# Найдите все блокирующие вызовы в async функциях
grep -r "requests\." --include="*.py" | grep "async def"
grep -r "time\.sleep" --include="*.py" | grep "async def"
grep -r "psycopg2" --include="*.py" | grep "async def"

Что искать:

  • requests.get() → заменить на aiohttp
  • time.sleep() → заменить на asyncio.sleep()
  • psycopg2 → заменить на asyncpg

Шаг 2: Включите debug mode (5 минут)

# В main файле
import asyncio
 
if __name__ == "__main__":
    asyncio.run(main(), debug=True)  # Добавьте debug=True

Или через переменную окружения:

PYTHONASYNCIODEBUG=1 python app.py

Результат: Вы увидите предупреждения о забытых await, медленных callback'ах и других проблемах.

Шаг 3: Обсудите с командой (30 минут)

Выберите один аргумент из раздела "Когда async — это overkill" и обсудите:

  • Подходит ли async для вашего проекта?
  • Какие части системы получат выгоду?
  • Какие риски миграции?

Шаг 4: Поделитесь опытом

Обнаружили что-то интересное? Пишите в комментариях:

  • Какой blocking call нашли в своём коде?
  • Какое ускорение получили после оптимизации?
  • С какой проблемой столкнулись при внедрении async?

Ваш кейс поможет другим разработчикам избежать похожих ошибок.


📚 Дополнительные материалы

Официальная документация:

Практические гайды:

Библиотеки:

  • aiohttp — async HTTP client/server
  • asyncpg — fastest PostgreSQL driver
  • httpx — modern HTTP client (sync + async)

P.S. Async/await — это не серебряная пуля, а инструмент для конкретных задач. Понимание внутреннего устройства — это ваше конкурентное преимущество. Используйте его с умом.