Три года назад я добавил async/await в проект "потому что это модно и быстро". Через месяц production падал каждые 3 часа с загадочным RuntimeError: Event loop is closed. CPU usage был выше, чем до миграции. А половина эндпоинтов висла по 30 секунд, потому что я не понимал разницу между await asyncio.sleep(1) и time.sleep(1).
Сегодня расскажу, что происходит под капотом async/await, чтобы вы не наступили на те же грабли. Без маркетинга "асинхронность делает всё быстрее". С честными кейсами, когда async — это костыль, а не решение.
🎯 TL;DR: Зачем читать эту статью
Вы прочитаете эту статью, если хотите:
- Понимать, а не просто использовать
async/await— разбор от байткода до event loop - Избегать классических ошибок:
time.sleep()вместоasyncio.sleep(),requestsвместоaiohttp - Диагностировать production проблемы: deadlock'и, утечки задач, блокирующие вызовы
- Принимать решения: когда async — правильный выбор, а когда — overengineering
- Получить практику: 15+ паттернов, антипаттернов и ready-to-use решений
Формат: Технический deep dive с примерами кода, диаграммами и честными кейсами из production.
Уровень: Middle+ разработчики, знакомые с базовым синтаксисом async/await.
Что такое async/await (и чем это НЕ является)
Async ≠ параллелизм
Главное заблуждение новичков (и моё на старте):
# ❌ Это НЕ сделает код быстрее автоматически
async def slow_computation():
result = 0
for i in range(10_000_000): # CPU-bound операция
result += i
return result
# Этот код будет работать ТАКЖЕ медленно, как синхронный
await slow_computation()Почему? Потому что async/await в Python — это кооперативная многозадачность (cooperative multitasking), а не параллельное выполнение. Код всё равно выполняется в одном потоке.
Аналогия: Представьте ресторан с одним поваром (event loop). Async позволяет ему начать готовить пасту, потом пока она варится — порезать салат, потом вернуться к пасте. Но в любой момент времени он делает только одну операцию. Параллелизм (multiprocessing) — это несколько поваров на кухне.
Что async/await РЕАЛЬНО делает
Async/await позволяет переключаться между задачами во время ожидания I/O:
import asyncio
import aiohttp
# ✅ Async имеет смысл для I/O-bound операций
async def fetch_user(user_id: int):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
return await resp.json()
# Пока ждём ответ от сервера №1, запрашиваем сервер №2
users = await asyncio.gather(
fetch_user(1), # Начинаем запрос 1
fetch_user(2), # Начинаем запрос 2 (не дожидаясь завершения запроса 1!)
fetch_user(3), # Начинаем запрос 3
)
# Синхронный аналог:
def fetch_user_sync(user_id: int):
response = requests.get(f"https://api.example.com/users/{user_id}")
return response.json()
# Выполняются последовательно: запрос 1 → ждём → запрос 2 → ждём → запрос 3
users = [fetch_user_sync(1), fetch_user_sync(2), fetch_user_sync(3)]Визуализация разницы:
| Подход | Таймлайн выполнения | Общее время |
|---|---|---|
| Синхронный | [Запрос 1 ========] → [Запрос 2 ========] → [Запрос 3 ========] | ~300ms |
| Асинхронный | [Запрос 1 ========][Запрос 2 ========][Запрос 3 ========] | ~100ms |
🚀 Performance Nugget: При трёх параллельных запросах async даёт 3x ускорение. При 10 запросах — 10x. При 100 — до 100x. Это работает только для I/O-bound операций!
Под капотом: от генераторов к async/await
Исторический контекст: async до async/await
До Python 3.5 асинхронность строилась на генераторах и yield from:
# Python 3.4 (asyncio с генераторами)
@asyncio.coroutine
def fetch_data():
response = yield from aiohttp.request('GET', 'https://api.example.com')
data = yield from response.json()
return dataЭто работало, но выглядело как костыль. Python 3.5 добавил синтаксический сахар async/await:
# Python 3.5+ (современный синтаксис)
async def fetch_data():
response = await aiohttp.request('GET', 'https://api.example.com')
data = await response.json()
return dataВажно: Под капотом это всё ещё генераторы! async def создаёт coroutine object, который работает по принципу генераторов.
Что происходит при вызове async функции
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# Вызов async функции НЕ выполняет её код!
coro = hello()
print(type(coro)) # <class 'coroutine'>
# Код выполняется только при await
await coro # Hello → пауза 1 сек → WorldПод капотом:
# Упрощённый псевдокод того, что делает интерпретатор
def hello():
"""Генераторная версия async def hello()."""
print("Hello")
yield asyncio.sleep(1) # yield — точка приостановки
print("World")
# При вызове async функции:
# 1. Python создаёт coroutine object (аналог генератора)
# 2. Сохраняет состояние функции (локальные переменные, позицию выполнения)
# 3. Возвращает объект, который можно await'ить
coro = hello() # Ничего не выполняется, только создаётся объектБайткод async функции
Посмотрим, что происходит на уровне байткода:
import dis
async def example():
await asyncio.sleep(1)
dis.dis(example)Вывод (упрощённо):
2 0 LOAD_GLOBAL 0 (asyncio)
2 LOAD_METHOD 1 (sleep)
4 LOAD_CONST 1 (1)
6 CALL_METHOD 1
8 GET_AWAITABLE 0 ← Ключевая инструкция!
10 LOAD_CONST 0 (None)
12 YIELD_FROM ← Генератор под капотом!
14 POP_TOP
16 LOAD_CONST 0 (None)
18 RETURN_VALUE
GET_AWAITABLE и YIELD_FROM — доказательство, что async/await использует механизм генераторов.
Event Loop: дирижёр асинхронного оркестра
Что такое event loop
Event loop — это бесконечный цикл, который:
- Проверяет, какие задачи готовы к выполнению
- Выполняет готовые задачи до следующего
await - Переключается на другую задачу, если текущая ждёт I/O
- Повторяет цикл
Упрощённая реализация event loop:
class SimpleEventLoop:
def __init__(self):
self.tasks = [] # Очередь задач
self.ready = [] # Готовые к выполнению задачи
def create_task(self, coro):
"""Добавляет корутину в очередь."""
task = Task(coro)
self.tasks.append(task)
return task
def run_until_complete(self, coro):
"""Запускает event loop до завершения корутины."""
task = self.create_task(coro)
while not task.done():
# 1. Собираем задачи, готовые к выполнению
self.ready = [t for t in self.tasks if t.can_run()]
# 2. Выполняем готовые задачи
for task in self.ready:
try:
# Делаем шаг выполнения (до следующего await)
task.step()
except StopIteration:
# Корутина завершилась
task.set_done()
# 3. Проверяем I/O события (сокеты, таймеры)
self._poll_io()
return task.result()
def _poll_io(self):
"""Проверяет завершённые I/O операции."""
# В реальном asyncio здесь select/epoll/kqueue
# Для упрощения — просто пауза
passКак работает await
async def main():
print("Start")
await asyncio.sleep(1) # ← Что здесь происходит?
print("End")Под капотом:
# 1. asyncio.sleep(1) возвращает Future объект
future = asyncio.sleep(1)
# 2. await передаёт управление event loop'у
# "Я жду завершения future, можешь выполнять другие задачи"
# Event loop переключается на следующую задачу
# 3. Через 1 секунду event loop помечает future как завершённый
# И возвращает управление в точку await
# 4. Код продолжается после await
print("End")Визуализация:
Создание и запуск event loop
import asyncio
# Вариант 1: asyncio.run() (Python 3.7+)
asyncio.run(main()) # Создаёт loop → выполняет → закрывает
# Вариант 2: Ручное управление (legacy)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Вариант 3: Уже внутри async функции
async def inside_async():
loop = asyncio.get_running_loop() # Получить текущий loop
# НЕ создавайте новый loop внутри async функции!⚠️ Распространённая ошибка:
async def broken():
asyncio.run(some_coro()) # ❌ RuntimeError: asyncio.run() cannot be called from a running event loopВы уже внутри event loop! Используйте await some_coro() вместо asyncio.run().
Корутины, Tasks и Futures: в чём разница
Coroutine (корутина)
async def fetch_user(user_id: int):
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
# Вызов async функции создаёт coroutine object
coro = fetch_user(1)
print(type(coro)) # <class 'coroutine'>
# Корутина НЕ выполняется до await/create_task
await coro # ← Вот здесь выполняетсяКорутина — это объект, представляющий отложенное выполнение async функции.
Task (задача)
# Task оборачивает корутину для запуска в event loop
task = asyncio.create_task(fetch_user(1))
# Задача начинает выполняться СРАЗУ (в фоне)
# Можно сделать другую работу
print("Задача выполняется в фоне...")
# Дождаться результата
result = await taskTask — это обёртка над корутиной, которая запускается в event loop немедленно. Можно отменять, отслеживать статус, получать результат.
Разница:
# Корутина НЕ выполняется до await
coro = fetch_user(1)
await asyncio.sleep(5) # fetch_user ещё не начал выполняться
result = await coro # Только тут начинается выполнение
# Task выполняется СРАЗУ
task = asyncio.create_task(fetch_user(1))
await asyncio.sleep(5) # fetch_user уже выполняется в фоне!
result = await task # Просто получаем результатFuture (будущий результат)
# Future — это низкоуровневая абстракция для результата, который будет позже
future = asyncio.Future()
# Future в состоянии "pending"
print(future.done()) # False
# Устанавливаем результат (обычно делает event loop)
future.set_result(42)
# Future в состоянии "done"
print(future.done()) # True
result = await future # 42Future — это "обещание" (promise), что значение будет доступно в будущем. Task наследуется от Future.
Иерархия:
Future (базовый класс)
└── Task (Future + управление корутиной)
Практический пример: разница в поведении
import asyncio
import time
async def slow_operation(n):
print(f"[{time.time():.2f}] Start {n}")
await asyncio.sleep(1)
print(f"[{time.time():.2f}] End {n}")
return n * 2
async def example_coroutines():
"""Корутины: выполняются последовательно."""
print("=== Coroutines ===")
result1 = await slow_operation(1) # Ждём завершения
result2 = await slow_operation(2) # Потом второй
return [result1, result2]
async def example_tasks():
"""Tasks: выполняются параллельно."""
print("=== Tasks ===")
task1 = asyncio.create_task(slow_operation(1)) # Запускаем сразу
task2 = asyncio.create_task(slow_operation(2)) # Запускаем сразу
# Оба выполняются одновременно
result1 = await task1
result2 = await task2
return [result1, result2]
# Корутины: ~2 секунды (последовательно)
await example_coroutines()
# [1.00] Start 1
# [2.00] End 1
# [2.00] Start 2
# [3.00] End 2
# Tasks: ~1 секунда (параллельно)
await example_tasks()
# [1.00] Start 1
# [1.00] Start 2 ← Запустились одновременно!
# [2.00] End 1
# [2.00] End 2Типичные ошибки: blocking calls в async коде
Ошибка №1: Использование синхронных библиотек
import asyncio
import requests # ❌ Синхронная библиотека!
import time
async def fetch_users_broken():
"""Плохой пример: блокирует event loop."""
users = []
for user_id in range(10):
# requests.get() блокирует весь event loop!
response = requests.get(f"https://api.example.com/users/{user_id}")
users.append(response.json())
return users
# Пока выполняется requests.get(), event loop не может переключиться на другие задачи
# Весь async код стоит и ждётПочему это плохо:
async def main():
# Создаём 100 задач
tasks = [asyncio.create_task(fetch_users_broken()) for _ in range(100)]
# Ожидаем завершения всех
await asyncio.gather(*tasks)
# Ожидаемо: 100 задач выполняются параллельно → ~1 секунда
# Реально: 100 задач выполняются последовательно → ~100 секунд
# Потому что requests.get() блокирует event loop!Правильно:
import aiohttp # ✅ Асинхронная библиотека
async def fetch_users_correct():
"""Правильный пример: не блокирует event loop."""
users = []
async with aiohttp.ClientSession() as session:
for user_id in range(10):
# aiohttp поддерживает await — event loop может переключаться
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
users.append(await resp.json())
return usersОшибка №2: time.sleep() вместо asyncio.sleep()
async def broken_delay():
print("Start")
time.sleep(1) # ❌ Блокирует event loop на 1 секунду!
print("End")
async def correct_delay():
print("Start")
await asyncio.sleep(1) # ✅ Передаёт управление event loop'у
print("End")Демонстрация проблемы:
async def test_blocking():
tasks = [
asyncio.create_task(broken_delay()),
asyncio.create_task(broken_delay()),
]
await asyncio.gather(*tasks)
# Вывод:
# Start
# (пауза 1 сек — весь event loop заблокирован!)
# End
# Start
# (пауза 1 сек)
# End
# Итого: 2 секунды (последовательно)
async def test_non_blocking():
tasks = [
asyncio.create_task(correct_delay()),
asyncio.create_task(correct_delay()),
]
await asyncio.gather(*tasks)
# Вывод:
# Start
# Start
# (пауза 1 сек — обе задачи ждут параллельно)
# End
# End
# Итого: 1 секунда (параллельно)Ошибка №3: Блокирующие операции с БД
import asyncio
import psycopg2 # ❌ Синхронный драйвер PostgreSQL
async def fetch_users_db_broken():
"""Плохо: блокирует event loop."""
conn = psycopg2.connect("dbname=test") # Блокирующее подключение
cursor = conn.cursor()
cursor.execute("SELECT * FROM users") # Блокирующий запрос
users = cursor.fetchall()
conn.close()
return usersПравильно:
import asyncpg # ✅ Асинхронный драйвер PostgreSQL
async def fetch_users_db_correct():
"""Правильно: не блокирует event loop."""
conn = await asyncpg.connect("postgresql://localhost/test")
users = await conn.fetch("SELECT * FROM users")
await conn.close()
return usersОшибка №4: CPU-bound операции в async коде
async def process_data_broken(data):
"""Плохо: CPU-bound задача блокирует event loop."""
result = []
for item in data:
# Тяжёлые вычисления (например, обработка изображений)
processed = expensive_computation(item) # Блокирует!
result.append(processed)
return resultПравильно: вынести в отдельный поток/процесс
import asyncio
from typing import Any
from concurrent.futures import ProcessPoolExecutor
# ✅ Вариант 1: asyncio.to_thread() — для I/O-bound блокирующих операций (Python 3.9+)
async def process_data_io_bound(data: list[Any]) -> list[Any]:
"""Современный способ для блокирующих I/O операций."""
result = []
for item in data:
# Запускаем блокирующую функцию в отдельном потоке
processed = await asyncio.to_thread(blocking_io_operation, item)
result.append(processed)
return result
def blocking_io_operation(item: Any) -> Any:
"""Синхронная функция с блокирующим I/O (например, requests.get)."""
import time
time.sleep(0.1) # Имитация блокирующего I/O
return item * 2
# ✅ Вариант 2: ProcessPoolExecutor — для CPU-bound операций
async def process_data_cpu_bound(data: list[Any]) -> list[Any]:
"""CPU-bound задача в отдельном процессе."""
loop = asyncio.get_running_loop()
# Выполняем в отдельном процессе
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
expensive_computation_batch, # Синхронная функция
data
)
return result
def expensive_computation_batch(data: list[Any]) -> list[Any]:
"""Синхронная функция для выполнения в отдельном процессе."""
return [expensive_computation(item) for item in data]
def expensive_computation(item: Any) -> Any:
"""Тяжёлые вычисления (CPU-bound)."""
result = 0
for i in range(1_000_000):
result += i
return result + item💡 Pro Tip: Когда использовать что?
asyncio.to_thread()— для блокирующих I/O операций (requests, psycopg2, file I/O). Легковесно, использует ThreadPoolExecutor.ProcessPoolExecutor— для CPU-bound задач (обработка изображений, математические вычисления). Реальный параллелизм.- Async библиотеки (aiohttp, asyncpg) — всегда лучше, если доступны. Не блокируют event loop вообще.
Как обнаружить blocking calls
Инструмент №1: aiodebug
import asyncio
import aiodebug
# Предупреждает, если какая-то операция блокирует event loop дольше 0.1 сек
aiodebug.log_slow_callbacks.enable(0.1)
async def main():
time.sleep(0.2) # ⚠️ Warning: Executing <function main> took 0.200 seconds
asyncio.run(main())Инструмент №2: asyncio debug mode
import asyncio
import logging
# Включаем debug mode
asyncio.run(main(), debug=True)
# Или через переменную окружения
# PYTHONASYNCIODEBUG=1 python script.pyЧто показывает debug mode:
- Корутины, которые не были await'нуты (забытые)
- Блокирующие вызовы, которые долго выполняются
- Задачи, которые не были awaited
Инструмент №3: profiling
import asyncio
import cProfile
async def main():
await some_async_function()
# Профилирование async кода
cProfile.run('asyncio.run(main())')Debugging async кода в production
Проблема №1: "Event loop is closed"
# ❌ Частая ошибка при тестировании
import asyncio
import pytest
def test_async_function():
result = asyncio.run(fetch_data())
assert result == expected
# Второй вызов
result2 = asyncio.run(fetch_data()) # RuntimeError: Event loop is closedПочему происходит:
asyncio.run() создаёт event loop → выполняет корутину → закрывает loop. Повторный вызов пытается использовать закрытый loop.
Решение:
# ✅ Используйте pytest-asyncio
import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await fetch_data()
assert result == expected
result2 = await fetch_data() # Работает!Проблема №2: Корутина не была awaited
async def fetch_data():
await asyncio.sleep(0.1)
return "data"
async def main():
# ❌ Забыли await
result = fetch_data() # Создаёт coroutine object, но не выполняет!
print(result) # <coroutine object fetch_data at 0x...>
# Warning: coroutine 'fetch_data' was never awaitedПравильно:
async def main():
result = await fetch_data() # ✅
print(result) # "data"Проблема №3: Deadlock из-за неправильного порядка await
async def task_a():
print("A: waiting for B")
await some_event.wait() # Ждёт, пока task_b установит событие
print("A: done")
async def task_b():
print("B: waiting for A")
await another_event.wait() # Ждёт, пока task_a установит событие
print("B: done")
# ❌ Deadlock: обе задачи ждут друг друга
await asyncio.gather(task_a(), task_b())Debugging:
import asyncio
# Печатаем все запущенные задачи
async def debug_tasks():
while True:
tasks = asyncio.all_tasks()
print(f"Running tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task.get_coro()}")
await asyncio.sleep(5)
# Запускаем параллельно с основным кодом
asyncio.create_task(debug_tasks())Проблема №4: Утечка задач (task leak)
async def create_background_task():
# ❌ Задача создана, но никто не ждёт её завершения
asyncio.create_task(background_worker())
# При завершении программы задача будет убита
async def background_worker():
while True:
await process_queue()
await asyncio.sleep(1)Правильно:
async def create_background_task():
# ✅ Сохраняем reference на задачу
task = asyncio.create_task(background_worker())
try:
await main_logic()
finally:
# Отменяем background задачу при завершении
task.cancel()
try:
await task
except asyncio.CancelledError:
passИнструменты для debugging в production
1. aiodebug + structlog
import asyncio
import structlog
from aiodebug import log_slow_callbacks
# Логируем медленные callback'и
log_slow_callbacks.enable(0.1)
logger = structlog.get_logger()
async def monitored_function():
logger.info("started", function="monitored_function")
await slow_operation()
logger.info("completed", function="monitored_function")2. aiomonitor — интерактивный debugger
import asyncio
from aiomonitor import start_monitor
async def main():
# Запускаем monitor на порту 50101
with start_monitor(loop=asyncio.get_running_loop()):
await application_code()
# В другом терминале:
# $ nc localhost 50101
# > ps # Список запущенных задач
# > where <task_id> # Stacktrace задачи
# > cancel <task_id> # Отменить задачу3. Prometheus метрики
from prometheus_client import Histogram, Counter
import asyncio
# Метрики
task_duration = Histogram(
'async_task_duration_seconds',
'Time spent in async task',
['task_name']
)
task_errors = Counter(
'async_task_errors_total',
'Total errors in async tasks',
['task_name', 'error_type']
)
async def monitored_task(name: str):
with task_duration.labels(task_name=name).time():
try:
await actual_work()
except Exception as e:
task_errors.labels(
task_name=name,
error_type=type(e).__name__
).inc()
raise4. OpenTelemetry для distributed tracing
from opentelemetry import trace
from opentelemetry.instrumentation.aiohttp import AioHttpClientInstrumentor
# Автоматическая инструментация aiohttp
AioHttpClientInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
async def fetch_user(user_id: int):
# Создаём span для трейсинга
with tracer.start_as_current_span("fetch_user") as span:
span.set_attribute("user_id", user_id)
async with aiohttp.ClientSession() as session:
async with session.get(f"/users/{user_id}") as resp:
# Автоматически добавляет trace_id в заголовки
return await resp.json()Когда async — это overkill (честный разбор)
Сценарий №1: Простой CRUD API с малой нагрузкой
# ❌ Overkill для простого API
from fastapi import FastAPI
import asyncpg
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
conn = await asyncpg.connect("postgresql://localhost/test")
user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
await conn.close()
return user
# Нагрузка: 10 req/sec
# Сложность: высокая (async DB driver, async dependencies)
# Выгода: практически нетПравильно для малой нагрузки:
# ✅ Простой синхронный код
from flask import Flask
import psycopg2
app = Flask(__name__)
@app.route("/users/<int:user_id>")
def get_user(user_id):
conn = psycopg2.connect("dbname=test")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
conn.close()
return user
# Нагрузка: 10 req/sec
# Сложность: низкая (знакомые инструменты)
# Выгода: простота поддержкиКогда переходить на async:
- Нагрузка >100 req/sec с I/O операциями
- Много параллельных внешних запросов (API, БД)
- WebSocket соединения
Сценарий №2: CPU-bound задачи
# ❌ Async не поможет для CPU-bound
async def process_images_broken(images):
"""НЕ станет быстрее от async/await."""
results = []
for image in images:
# CPU-bound операция — блокирует event loop
processed = apply_filters(image) # Тяжёлые вычисления
results.append(processed)
return results
# Время выполнения: ~10 секунд для 100 изображенийПравильно: используйте multiprocessing
# ✅ Параллельная обработка в нескольких процессах
from multiprocessing import Pool
def process_images_correct(images):
"""Реальный параллелизм для CPU-bound."""
with Pool(processes=4) as pool:
results = pool.map(apply_filters, images)
return results
# Время выполнения: ~2.5 секунды (4x ускорение на 4-ядерном CPU)Hybrid подход для I/O + CPU:
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def download_and_process_images(urls):
"""I/O (скачивание) — async, CPU (обработка) — multiprocessing."""
# 1. Скачиваем параллельно (async)
images = await asyncio.gather(*[download_image(url) for url in urls])
# 2. Обрабатываем в отдельных процессах (multiprocessing)
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
results = await loop.run_in_executor(
pool,
process_images_batch,
images
)
return resultsСценарий №3: Legacy код с синхронными зависимостями
# ❌ Мучительная миграция
# У вас есть большая кодовая база на Flask/Django с синхронными библиотеками
import requests
import psycopg2
from old_library import SyncClient # Синхронная библиотека без async версии
# Попытка добавить async:
async def new_endpoint():
# Нужно либо:
# 1. Заменить ВСЕ зависимости на async версии
# 2. Или использовать run_in_executor() везде (костыль)
loop = asyncio.get_running_loop()
# Каждый синхронный вызов — отдельный executor
response = await loop.run_in_executor(None, requests.get, url)
db_result = await loop.run_in_executor(None, db_query, query)
client_result = await loop.run_in_executor(None, SyncClient().fetch, data)
# Код превращается в кашуКогда async НЕ стоит того:
-
50% кодовой базы — синхронные библиотеки
- Нет async версий критичных зависимостей
- Команда не знакома с async (learning curve высокий)
Альтернатива:
# ✅ Масштабируйте горизонтально
# Вместо миграции на async — запустите больше синхронных воркеров
# Gunicorn с 4 процессами
gunicorn app:app --workers 4
# Или используйте балансировщик нагрузки
# Nginx → 4 инстанса Flask приложенияСценарий №4: Скрипты и CLI инструменты
# ❌ Overkill для простого скрипта
import asyncio
import aiohttp
async def fetch_all_users():
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, i) for i in range(100)]
users = await asyncio.gather(*tasks)
return users
async def fetch_user(session, user_id):
async with session.get(f"/users/{user_id}") as resp:
return await resp.json()
# Запускается один раз в день через cron
asyncio.run(fetch_all_users())Правильно для одноразовых скриптов:
# ✅ Простой синхронный код
import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_all_users():
with ThreadPoolExecutor(max_workers=10) as executor:
users = list(executor.map(fetch_user, range(100)))
return users
def fetch_user(user_id):
response = requests.get(f"/users/{user_id}")
return response.json()
# Проще понять, проще отлаживатьКогда async действительно нужен
✅ Используйте async когда:
-
High-load I/O-bound приложения
- Web API с >100 req/sec
- WebSocket сервера с тысячами соединений
- Микросервисы с множеством внешних запросов
-
Много параллельных I/O операций
- Агрегация данных из 10+ API
- Batch обработка с HTTP запросами
- Работа с очередями сообщений (Kafka, RabbitMQ)
-
Realtime приложения
- Chat серверы
- Streaming данных
- Live обновления через Server-Sent Events
❌ НЕ используйте async когда:
- CPU-bound задачи (используйте multiprocessing)
- Малая нагрузка (<50 req/sec)
- Legacy код с синхронными зависимостями
- Простые скрипты и CLI инструменты
- Команда не готова к async (обучение + debugging дороже выгоды)
Практические паттерны
Паттерн №1: Semaphore для ограничения конкурентности
import asyncio
async def fetch_url(session, url, semaphore):
# Ограничиваем количество одновременных запросов
async with semaphore:
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
# Максимум 10 одновременных запросов
semaphore = asyncio.Semaphore(10)
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Без semaphore: 1000 одновременных соединений → сервер упадёт
# С semaphore: максимум 10 соединений → стабильная работаПаттерн №2: Retry с exponential backoff
import asyncio
from typing import TypeVar, Callable
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[..., T],
max_retries: int = 3,
initial_delay: float = 1.0,
*args,
**kwargs
) -> T:
"""Повторяет async функцию с экспоненциальной задержкой."""
delay = initial_delay
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
await asyncio.sleep(delay)
delay *= 2 # Экспоненциальный рост задержки
# Использование
result = await retry_with_backoff(
fetch_user,
max_retries=5,
initial_delay=1.0,
user_id=123
)Паттерн №3: Timeout для защиты от зависаний
import asyncio
async def fetch_with_timeout(url, timeout=5.0):
try:
async with asyncio.timeout(timeout): # Python 3.11+
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
except asyncio.TimeoutError:
print(f"Request to {url} timed out after {timeout}s")
return None
# Python 3.10 и ранее:
async def fetch_with_timeout_legacy(url, timeout=5.0):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await asyncio.wait_for(resp.json(), timeout=timeout)
except asyncio.TimeoutError:
print(f"Request to {url} timed out")
return NoneПаттерн №4: Graceful shutdown
import asyncio
import signal
class Application:
def __init__(self):
self.tasks = []
self.shutdown_event = asyncio.Event()
async def background_worker(self, name: str):
"""Background задача, которая корректно завершается."""
print(f"Worker {name} started")
try:
while not self.shutdown_event.is_set():
await self.do_work()
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"Worker {name} cancelled")
raise
finally:
print(f"Worker {name} cleanup")
async def do_work(self):
# Основная работа
pass
async def start(self):
"""Запускает приложение."""
# Создаём background задачи
self.tasks = [
asyncio.create_task(self.background_worker(f"worker-{i}"))
for i in range(5)
]
# Ждём сигнала завершения
await self.shutdown_event.wait()
# Graceful shutdown
await self.shutdown()
async def shutdown(self):
"""Корректное завершение всех задач."""
print("Shutting down...")
# Отменяем все задачи
for task in self.tasks:
task.cancel()
# Ждём завершения (обрабатываем CancelledError)
results = await asyncio.gather(*self.tasks, return_exceptions=True)
# Проверяем результаты
for i, result in enumerate(results):
if isinstance(result, asyncio.CancelledError):
print(f"Task {i} was cancelled")
elif isinstance(result, Exception):
print(f"Task {i} failed: {result}")
print("Shutdown complete")
def handle_signal(self):
"""Обработчик SIGINT/SIGTERM."""
print("Received shutdown signal")
self.shutdown_event.set()
async def main():
app = Application()
# Регистрируем обработчики сигналов
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, app.handle_signal)
await app.start()
asyncio.run(main())📚 Шпаргалка: Async/Await в одной таблице
Ключевые концепции
- Async ≠ параллелизм — это кооперативная многозадачность в одном потоке
- await — точка, где event loop может переключиться на другую задачу
- Coroutine — отложенное выполнение, Task — запущенная задача, Future — обещание результата
- Event loop — дирижёр, который управляет выполнением задач
Частые ошибки и решения
| ❌ Проблема | 💥 Последствие | ✅ Решение | 🔧 Код |
|---|---|---|---|
time.sleep() в async коде | Блокирует event loop | asyncio.sleep() | await asyncio.sleep(1) |
Синхронные библиотеки (requests) | Блокирует весь async код | Async версии | aiohttp, httpx, asyncpg |
Забытый await | Корутина не выполняется | Debug mode | PYTHONASYNCIODEBUG=1 |
| CPU-bound в async | Блокирует event loop | asyncio.to_thread() или ProcessPoolExecutor | await asyncio.to_thread(func) |
asyncio.run() внутри async | RuntimeError | Используйте await | await coro() вместо asyncio.run(coro()) |
| Blocking DB calls | Event loop простаивает | Async драйверы | asyncpg, motor, aiosqlite |
Шпаргалка по библиотекам
| Задача | ❌ Синхронная (блокирует) | ✅ Асинхронная | 📦 Установка |
|---|---|---|---|
| HTTP-клиент | requests | aiohttp или httpx | pip install aiohttp |
| PostgreSQL | psycopg2 | asyncpg | pip install asyncpg |
| MySQL | pymysql | aiomysql | pip install aiomysql |
| MongoDB | pymongo | motor | pip install motor |
| Redis | redis | aioredis | pip install aioredis |
| SQLite | sqlite3 | aiosqlite | pip install aiosqlite |
| File I/O | open(), read() | aiofiles | pip install aiofiles |
| Блокирующий код | Любая sync функция | asyncio.to_thread() | Встроено (Python 3.9+) |
Когда использовать async
| Сценарий | ✅ Async | ❌ Sync | Почему? |
|---|---|---|---|
| Web API (>100 req/s) | ✅ | — | Много параллельных I/O операций |
| WebSocket сервер | ✅ | — | Тысячи одновременных соединений |
| Микросервисы с внешними API | ✅ | — | Множество HTTP запросов |
| Batch обработка данных | — | ✅ | CPU-bound, нужен multiprocessing |
| Простой CRUD API (<50 req/s) | — | ✅ | Overhead async не окупается |
| Legacy проект | — | ✅ | Миграция дороже выгоды |
| CLI инструменты | — | ✅ | Простота важнее производительности |
Production Checklist
Перед deploy в production:
- Включён
asynciodebug mode на staging:PYTHONASYNCIODEBUG=1 - Используется
aiodebugдля обнаружения blocking calls (>100ms) - Добавлен мониторинг метрик: latency, throughput, event loop lag
- Настроен distributed tracing (OpenTelemetry) для async задач
- Реализован graceful shutdown с отменой задач
- Добавлены timeout'ы для всех внешних запросов (5-30s)
- Настроен Semaphore для ограничения конкурентности
- Проверено отсутствие
requests,psycopg2,time.sleep()в async коде - Добавлен retry с exponential backoff для нестабильных API
- Настроены health-check эндпоинты для проверки event loop
Debugging в production
# 1. Включить debug mode
import asyncio
asyncio.run(main(), debug=True)
# 2. Логировать медленные callbacks
from aiodebug import log_slow_callbacks
log_slow_callbacks.enable(0.1) # Предупреждение если >100ms
# 3. Мониторить запущенные задачи
tasks = asyncio.all_tasks()
print(f"Running: {len(tasks)} tasks")
# 4. Проверить stack trace задачи
for task in tasks:
print(task.get_stack())🎯 Ваши следующие шаги
Вы прочитали 25 минут технического контента. Теперь превратите знания в практику:
Шаг 1: Аудит текущего кода (15 минут)
# Найдите все блокирующие вызовы в async функциях
grep -r "requests\." --include="*.py" | grep "async def"
grep -r "time\.sleep" --include="*.py" | grep "async def"
grep -r "psycopg2" --include="*.py" | grep "async def"Что искать:
requests.get()→ заменить наaiohttptime.sleep()→ заменить наasyncio.sleep()psycopg2→ заменить наasyncpg
Шаг 2: Включите debug mode (5 минут)
# В main файле
import asyncio
if __name__ == "__main__":
asyncio.run(main(), debug=True) # Добавьте debug=TrueИли через переменную окружения:
PYTHONASYNCIODEBUG=1 python app.pyРезультат: Вы увидите предупреждения о забытых await, медленных callback'ах и других проблемах.
Шаг 3: Обсудите с командой (30 минут)
Выберите один аргумент из раздела "Когда async — это overkill" и обсудите:
- Подходит ли async для вашего проекта?
- Какие части системы получат выгоду?
- Какие риски миграции?
Шаг 4: Поделитесь опытом
Обнаружили что-то интересное? Пишите в комментариях:
- Какой blocking call нашли в своём коде?
- Какое ускорение получили после оптимизации?
- С какой проблемой столкнулись при внедрении async?
Ваш кейс поможет другим разработчикам избежать похожих ошибок.
📚 Дополнительные материалы
Официальная документация:
- PEP 492 — спецификация async/await
- asyncio documentation — полная документация
Практические гайды:
- Real Python: Async IO in Python — hands-on туториал
- Python Asyncio: The Complete Guide — comprehensive resource
Библиотеки:
- aiohttp — async HTTP client/server
- asyncpg — fastest PostgreSQL driver
- httpx — modern HTTP client (sync + async)
P.S. Async/await — это не серебряная пуля, а инструмент для конкретных задач. Понимание внутреннего устройства — это ваше конкурентное преимущество. Используйте его с умом.
