Учебник по asyncio в Python
Полное руководство по асинхронному программированию в Python с использованием библиотеки asyncio — от основ до продвинутых паттернов
Table of Contents
Учебник по asyncio в Python
Ключевой принцип: Asyncio оптимизирует время простоя при I/O операциях, позволяя одному потоку обрабатывать тысячи одновременных подключений. Не подходит для CPU-интенсивных вычислений.
Как пользоваться материалом
Уровень: Intermediate - Advanced
Требования: Знание Python 3.10+, понимание ООП, опыт с сетевыми приложениями
Структура материала:
- Основы — синтаксис, event loop, корутины
- Продвинутые паттерны — синхронизация, очереди, graceful shutdown
- Production — debugging, тестирование, мониторинг, deployment
- Сквозной пример — создание production-ready веб-сервиса с нуля
Критические правила:
- ✅ Всегда используйте type hints
- ✅ Обрабатывайте все исключения в задачах
- ✅
asyncio.run()только в точке входа приложения - ❌ Никогда не блокируйте event loop синхронным кодом
- ❌ Не создавайте новый event loop внутри уже работающего
Введение в асинхронное программирование
Асинхронное программирование — это парадигма, при которой программа не ждёт завершения долгих операций (сетевые запросы, чтение файлов, обращение к БД), а продолжает выполнять другую работу. Когда операция завершается, программа возвращается к ней и обрабатывает результат.
Зачем это нужно?
Представьте веб-сервер, который обрабатывает 1000 запросов одновременно. Каждый запрос ждёт ответа от базы данных ~100ms.
Синхронный подход:
- Обрабатываем запросы по очереди
- Время обработки: 1000 × 100ms = 100 секунд ⚠️
- Пока ждём БД, процессор простаивает
Асинхронный подход:
- Запускаем все запросы параллельно
- Пока ждём БД, обрабатываем другие запросы
- Время обработки: ~100ms ✅
- Процессор всегда занят полезной работой
Сравнение подходов
Синхронный подход:
- Операции выполняются строго последовательно
- Подходит для: простые скрипты, вычисления
- Простота в отладке
Многопоточный подход (threading):
- Операции выполняются в разных потоках ОС
- ⚠️ В Python: Из-за GIL (Global Interpreter Lock) НЕ подходит для CPU-bound задач!
- Подходит для: I/O-bound задачи с блокирующими библиотеками
- Для CPU-bound используйте
multiprocessing
Многопроцессный подход (multiprocessing):
- Операции выполняются в разных процессах
- Подходит для: CPU-bound задачи (вычисления, обработка данных)
- Использует несколько ядер процессора
- Обходит ограничение GIL
Асинхронный подход:
- Один поток переключается между задачами
- Подходит для: I/O-bound задачи (сеть, файлы, БД)
- Эффективное использование времени ожидания
Важно: Asyncio не ускоряет вычисления! Он эффективен только для задач, где большую часть времени программа чего-то ждёт (I/O).
Основы asyncio
Критически важно:
async defобъявляет корутину, которая должна быть вызвана сawaittime.sleep()блокирует event loop — используйте толькоasyncio.sleep()asyncio.run()создает новый event loop — вызывайте ТОЛЬКО в точке входа (main)- Забытый
awaitвернет coroutine объект вместо результата
Первый пример: в чём разница?
import asyncio
import time
from typing import List
# ❌ Синхронная версия
def sync_download(file_id: int) -> str:
"""Блокирующая загрузка файла."""
print(f"[{file_id}] Начало загрузки")
time.sleep(2) # ⚠️ Блокирует весь процесс!
print(f"[{file_id}] Загрузка завершена")
return f"Данные {file_id}"
# ✅ Асинхронная версия
async def async_download(file_id: int) -> str:
"""Асинхронная загрузка файла."""
print(f"[{file_id}] Начало загрузки")
await asyncio.sleep(2) # ✅ Освобождает event loop
print(f"[{file_id}] Загрузка завершена")
return f"Данные {file_id}"
# Сравнение производительности
async def compare() -> None:
"""Демонстрация разницы между синхронным и асинхронным подходами."""
# Синхронно: загружаем 3 файла по очереди
start = time.time()
results_sync: List[str] = []
for i in range(3):
results_sync.append(sync_download(i))
print(f"Синхронно: {time.time() - start:.1f} сек") # ~6 секунд
print(f"Результаты: {results_sync}\n")
# Асинхронно: загружаем 3 файла одновременно
start = time.time()
results_async: List[str] = await asyncio.gather(
async_download(0),
async_download(1),
async_download(2)
)
print(f"Асинхронно: {time.time() - start:.1f} сек") # ~2 секунды!
print(f"Результаты: {results_async}")
if __name__ == "__main__":
asyncio.run(compare())Результат:
Синхронно: 6.0 сек
Асинхронно: 2.0 сек ← в 3 раза быстрее!
Ключевые концепции
1. async def — объявление корутины
from typing import Coroutine
async def my_coroutine() -> str:
"""Корутина — функция, которая может приостанавливаться на await."""
return "result"
# ❌ ОШИБКА: вызов без await возвращает coroutine объект
coro: Coroutine = my_coroutine() # <coroutine object my_coroutine>
# ✅ ПРАВИЛЬНО: await получает результат
result: str = await my_coroutine() # "result"2. await — ожидание асинхронной операции
import asyncio
async def fetch_data() -> dict:
# Приостанавливаем выполнение текущей корутины
# Event loop переключается на другие задачи
await asyncio.sleep(1)
return {"status": "ok"}
# Пока ждём sleep(1), event loop выполняет другие корутины3. asyncio.run() — запуск главной корутины
async def main() -> None:
"""Точка входа в асинхронное приложение."""
result = await my_coroutine()
print(result)
# ✅ Вызывается ТОЛЬКО в __main__
if __name__ == "__main__":
asyncio.run(main())
# ❌ НИКОГДА не вызывайте asyncio.run() внутри async функции!
async def wrong() -> None:
asyncio.run(main()) # RuntimeError: asyncio.run() cannot be called from a running event loopКорутины подробно
Техническая суть: Корутина — это специальный тип генератора, который может
приостановить выполнение на await и возобновить его позже. Event loop
управляет переключением между корутинами в точках приостановки.
Что такое корутина?
Корутина (coroutine) — это специальная функция, объявленная с ключевым словом async def, которая может приостановить своё выполнение на ключевом слове await и возобновить его позже, не блокируя при этом выполнение других корутин.
Ключевые характеристики корутины
1. Отличие от обычной функции:
from typing import Coroutine
# Обычная функция — выполняется немедленно
def regular_function() -> str:
return "result"
result = regular_function() # Сразу получаем "result"
print(result) # "result"
# Корутина — возвращает coroutine объект
async def coroutine_function() -> str:
return "result"
coro: Coroutine = coroutine_function() # Возвращает <coroutine object>
print(coro) # <coroutine object coroutine_function at 0x...>
# Чтобы получить результат, нужен await
result = await coroutine_function() # "result"2. Корутина vs Генератор:
Корутины в Python основаны на протоколе генераторов, но имеют важные отличия:
# Генератор — производит значения через yield
def generator():
yield 1
yield 2
yield 3
for value in generator():
print(value) # 1, 2, 3
# Корутина — приостанавливается на await
async def coroutine():
await asyncio.sleep(1)
return "done"
# Корутины нельзя итерировать, их нужно await
result = await coroutine() # "done"3. Три состояния корутины:
import asyncio
import inspect
async def my_coroutine() -> str:
await asyncio.sleep(1)
return "result"
async def demo():
# Создана, но не запущена
coro = my_coroutine()
print(f"State: {inspect.getcoroutinestate(coro)}") # CORO_CREATED
# Запущена, но приостановлена на await
task = asyncio.create_task(coro)
await asyncio.sleep(0.1)
print(f"State: {inspect.getcoroutinestate(coro)}") # CORO_SUSPENDED
# Завершена
result = await task
print(f"State: {inspect.getcoroutinestate(coro)}") # CORO_CLOSED
print(f"Result: {result}")4. Корутина может приостанавливаться множество раз:
async def multi_suspend() -> None:
"""Корутина с множественными приостановками."""
print("Начало работы")
await asyncio.sleep(1) # Приостановка 1
print("После 1 секунды")
await asyncio.sleep(1) # Приостановка 2
print("После 2 секунд")
result = await fetch_data() # Приостановка 3
print(f"Получены данные: {result}")
await asyncio.sleep(1) # Приостановка 4
print("Завершение работы")Как работают корутины под капотом
import asyncio
async def example() -> None:
"""Пример работы корутины."""
print("1. Начало выполнения")
# Здесь корутина приостанавливается
# Управление возвращается в event loop
await asyncio.sleep(1)
# Через 1 секунду event loop возобновляет выполнение
print("2. Продолжение после паузы")
# Что происходит:
# 1. Корутина начинает выполняться
# 2. Достигает await — приостанавливается
# 3. Event loop переключается на другие задачи
# 4. Через 1 сек event loop возвращается к корутине
# 5. Корутина продолжает выполнениеВажные правила работы с корутинами
✅ ПРАВИЛО 1: Корутину нужно вызывать с await или обернуть в задачу
# ❌ НЕПРАВИЛЬНО — корутина не выполнится
async def wrong():
my_coroutine() # Warning: coroutine was never awaited
# ✅ ПРАВИЛЬНО
async def correct():
await my_coroutine() # Корутина выполнится
# ✅ Альтернатива — create_task для фонового выполнения
async def also_correct():
task = asyncio.create_task(my_coroutine())
# Задача выполняется в фоне
await task # Ждём результат✅ ПРАВИЛО 2: Корутины можно вызывать только внутри async функций
# ❌ НЕПРАВИЛЬНО — нельзя await в обычной функции
def sync_function():
result = await async_function() # SyntaxError!
# ✅ ПРАВИЛЬНО — await только внутри async def
async def async_function():
result = await another_async_function()
return result
# Для запуска из синхронного кода используйте asyncio.run()
def main():
result = asyncio.run(async_function())✅ ПРАВИЛО 3: Не создавайте корутины напрямую (без async def)
# ❌ НЕПРАВИЛЬНО — старый способ (Python 3.4)
@asyncio.coroutine
def old_style():
yield from asyncio.sleep(1)
# ✅ ПРАВИЛЬНО — современный способ (Python 3.5+)
async def new_style():
await asyncio.sleep(1)Практический пример: параллельные корутины
import asyncio
from typing import List, Any
async def cook_pasta() -> str:
"""Асинхронное приготовление пасты (15 секунд)."""
print("Ставлю воду на плиту")
await asyncio.sleep(5) # ← Приостановка на 5 сек
print("Вода закипела!")
await asyncio.sleep(10) # ← Ещё одна приостановка
print("Паста готова!")
return "Паста"
async def make_salad() -> str:
"""Асинхронное приготовление салата (3 секунды)."""
print("Режу овощи")
await asyncio.sleep(3)
print("Салат готов!")
return "Салат"
# Готовим параллельно
async def cook_dinner() -> List[str]:
"""
Параллельное приготовление блюд.
Общее время = max(15, 3) = 15 секунд (не 15 + 3 = 18!)
"""
results: List[str] = await asyncio.gather(
cook_pasta(), # 15 секунд
make_salad() # 3 секунды
)
return results
if __name__ == "__main__":
dishes = asyncio.run(cook_dinner())
print(f"Готовые блюда: {dishes}")Вывод:
Ставлю воду на плиту
Режу овощи
Салат готов! ← Готов через 3 сек
Вода закипела! ← Через 5 сек
Паста готова! ← Через 15 сек от начала
Способы запуска корутин
import asyncio
from typing import List
async def task(task_id: int) -> int:
"""Асинхронная задача, возвращающая результат через 1 секунду."""
await asyncio.sleep(1)
return task_id * 10
# 1️⃣ await — последовательное выполнение
async def sequential_execution() -> None:
"""Выполняет задачи последовательно (медленно)."""
result1 = await task(1) # Ждём 1 сек
result2 = await task(2) # Ждём ещё 1 сек
result3 = await task(3) # Ждём ещё 1 сек
print(f"Последовательно (3 сек): {[result1, result2, result3]}")
# 2️⃣ create_task — параллельное выполнение с контролем
async def parallel_with_tasks() -> None:
"""Запускает задачи параллельно с возможностью отмены."""
# Создаём задачи (начинают выполняться сразу!)
task1 = asyncio.create_task(task(1))
task2 = asyncio.create_task(task(2))
task3 = asyncio.create_task(task(3))
# Можем делать что-то ещё...
print("Задачи выполняются в фоне")
await asyncio.sleep(0.5) # Имитация другой работы
# Ждём результаты (общее время ~1 сек, не 3!)
result1 = await task1
result2 = await task2
result3 = await task3
print(f"С create_task (~1 сек): {[result1, result2, result3]}")
# 3️⃣ gather — запуск нескольких задач одновременно
async def parallel_with_gather() -> None:
"""Самый простой способ запустить несколько задач параллельно."""
results: List[int] = await asyncio.gather(
task(1),
task(2),
task(3)
)
print(f"С gather (~1 сек): {results}") # [10, 20, 30]
if __name__ == "__main__":
asyncio.run(sequential_execution()) # ~3 секунды
asyncio.run(parallel_with_tasks()) # ~1 секунда
asyncio.run(parallel_with_gather()) # ~1 секундаEvent Loop — сердце asyncio
Критическое правило: asyncio.run() создает и управляет event loop
автоматически. Никогда не создавайте loop вручную через get_event_loop() или
new_event_loop() без крайней необходимости. Внутри корутин используйте
только await и asyncio.create_task().
Event Loop (цикл событий) — это менеджер, который:
- Отслеживает все корутины
- Переключается между ними в моменты
await - Возвращается к корутине, когда её ожидание завершено
Как это работает?
import asyncio
async def task(name, delay):
print(f"[{name}] Начало")
await asyncio.sleep(delay)
print(f"[{name}] Конец")
async def main():
await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
asyncio.run(main())Что происходит внутри:
0.0s: [A] Начало → await sleep(2) → event loop переключается
0.0s: [B] Начало → await sleep(1) → event loop переключается
0.0s: [C] Начало → await sleep(3) → event loop переключается
1.0s: [B] Конец ✓ (первая завершилась)
2.0s: [A] Конец ✓
3.0s: [C] Конец ✓
Управление event loop (редко нужно)
Важно: В 99% случаев вам НЕ нужно напрямую работать с event loop.
Используйте высокоуровневые API: asyncio.run(), asyncio.create_task(),
asyncio.gather(). Работа с loop напрямую нужна только для специфических
сценариев: интеграция с другими event loops, планирование callbacks, или
выполнение блокирующих операций.
Когда это нужно?
1. Выполнение блокирующих операций в отдельном потоке
import asyncio
import time
import requests # Блокирующая библиотека
def blocking_api_call(url: str) -> dict:
"""Синхронная функция, которая блокирует thread."""
response = requests.get(url) # Блокирующий I/O
time.sleep(1) # Блокирующая операция
return response.json()
async def fetch_data_from_blocking_lib():
"""Правильное использование блокирующей библиотеки в async коде."""
loop = asyncio.get_running_loop()
# ✅ Выполняем в отдельном потоке, не блокируя event loop
result = await loop.run_in_executor(
None, # None = default ThreadPoolExecutor
blocking_api_call,
"https://api.example.com/data"
)
return result
# Альтернатива с functools.partial для передачи аргументов
from functools import partial
async def fetch_multiple_blocking():
"""Параллельное выполнение нескольких блокирующих операций."""
loop = asyncio.get_running_loop()
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3"
]
# Параллельно вызываем блокирующие функции в threads
tasks = [
loop.run_in_executor(None, partial(blocking_api_call, url))
for url in urls
]
results = await asyncio.gather(*tasks)
return results2. Планирование callbacks (низкоуровневый API)
import asyncio
def callback_function():
"""Обычная синхронная функция."""
print("Callback выполнен!")
async def schedule_callbacks():
"""Примеры планирования callbacks через loop."""
loop = asyncio.get_running_loop()
# Выполнить callback как можно скорее
loop.call_soon(callback_function)
# Выполнить callback через 5 секунд
loop.call_later(5, lambda: print("Прошло 5 секунд"))
# Выполнить callback в конкретное время
import time
when = loop.time() + 10 # Через 10 секунд
loop.call_at(when, lambda: print("Прошло 10 секунд"))
await asyncio.sleep(11) # Ждём выполнения callbacks
# ⚠️ В большинстве случаев вместо этого используйте:
async def better_approach():
"""Более простой и понятный способ."""
await asyncio.sleep(5)
print("Прошло 5 секунд") # Проще и понятнее!3. Получение информации о loop
async def loop_info():
"""Получение информации о текущем event loop."""
loop = asyncio.get_running_loop()
# Время работы loop (монотонные часы)
print(f"Loop time: {loop.time()}")
# Проверка, запущен ли loop
print(f"Is running: {loop.is_running()}")
# Отладка: все активные задачи
tasks = asyncio.all_tasks(loop)
print(f"Active tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}")Важные правила:
- ✅ Используйте
asyncio.get_running_loop()внутри async функций - ❌ Не используйте
asyncio.get_event_loop()(deprecated в Python 3.10+) - ❌ Не создавайте loop вручную через
asyncio.new_event_loop()без необходимости - ✅
asyncio.run()управляет loop автоматически — используйте его в точке входа
Практический пример: миграция с синхронной библиотеки
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# ❌ Проблема: синхронная библиотека блокирует event loop
def old_sync_code():
"""Синхронный код, который блокирует выполнение."""
import requests
urls = ["https://example.com"] * 10
results = []
for url in urls:
response = requests.get(url) # Блокирует!
results.append(response)
return results
# ✅ Решение: используем ThreadPoolExecutor
async def new_async_code():
"""Асинхронная версия с использованием executor."""
import requests
loop = asyncio.get_running_loop()
urls = ["https://example.com"] * 10
# Создаём pool потоков
with ThreadPoolExecutor(max_workers=5) as executor:
tasks = [
loop.run_in_executor(executor, requests.get, url)
for url in urls
]
# Выполняем параллельно в threads
responses = await asyncio.gather(*tasks)
return responses
# ✅ Ещё лучше: используйте асинхронную библиотеку
async def best_approach():
"""Идеальный вариант с нативной async библиотекой."""
import aiohttp
urls = ["https://example.com"] * 10
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return responsesЗадачи (Tasks)
Best practice: Всегда используйте asyncio.create_task() для фонового
выполнения. Храните ссылки на задачи и явно отменяйте их при завершении
приложения через task.cancel(). Незавершенные задачи могут вызвать утечки
памяти и race conditions.
Tasks — это обёртка над корутинами для параллельного выполнения.
Создание и управление задачами
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1) # Имитация запроса к API
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# ❌ Плохо: последовательное выполнение
user1 = await fetch_user(1) # 1 сек
user2 = await fetch_user(2) # ещё 1 сек
user3 = await fetch_user(3) # ещё 1 сек
# Итого: 3 секунды
# ✅ Хорошо: параллельное выполнение
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
task3 = asyncio.create_task(fetch_user(3))
users = await asyncio.gather(task1, task2, task3)
# Итого: 1 секунда!
print(users)
asyncio.run(main())Обработка исключений в задачах
Критически важно: Исключения в задачах, созданных через create_task(),
НЕ прерывают программу автоматически! Если не сделать await task, исключение
будет потеряно и вы получите только warning в логах.
Проблема: "Тихие" исключения
import asyncio
async def failing_task():
"""Задача, которая падает с ошибкой."""
await asyncio.sleep(0.1)
raise ValueError("Something went wrong!")
# ❌ НЕПРАВИЛЬНО: Исключение будет потеряно
async def bad_example():
"""Исключение НЕ обрабатывается — плохо!"""
task = asyncio.create_task(failing_task())
# Делаем что-то другое
await asyncio.sleep(1)
# Задача упала, но мы об этом не узнали!
# В логах будет только:
# "Task exception was never retrieved"
print("Программа продолжает работать")
asyncio.run(bad_example())Правильные способы обработки исключений:
1. Явный await с try/except (рекомендуется)
import asyncio
import logging
logger = logging.getLogger(__name__)
async def fetch_data(url: str) -> dict:
"""Задача, которая может упасть."""
await asyncio.sleep(0.1)
if url == "bad_url":
raise ValueError(f"Invalid URL: {url}")
return {"url": url, "data": "success"}
# ✅ ПРАВИЛЬНО: Обрабатываем каждую задачу
async def correct_example_1():
"""Явная обработка исключений для каждой задачи."""
task1 = asyncio.create_task(fetch_data("https://api.com/1"))
task2 = asyncio.create_task(fetch_data("bad_url"))
task3 = asyncio.create_task(fetch_data("https://api.com/3"))
# Ждём каждую задачу и обрабатываем ошибки
for i, task in enumerate([task1, task2, task3], 1):
try:
result = await task
print(f"✅ Task {i}: {result}")
except ValueError as e:
logger.error(f"❌ Task {i} failed: {e}")
# Можем продолжить обработку других задач
except Exception as e:
logger.error(f"💥 Task {i} unexpected error: {e}")
asyncio.run(correct_example_1())2. gather() с return_exceptions=True (для параллельной обработки)
async def correct_example_2():
"""
Используем gather с return_exceptions=True.
Преимущество: Не останавливается на первой ошибке.
"""
results = await asyncio.gather(
fetch_data("https://api.com/1"),
fetch_data("bad_url"), # Упадёт с ошибкой
fetch_data("https://api.com/3"),
return_exceptions=True # ← Ключевой параметр!
)
# Обрабатываем результаты
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
logger.error(f"❌ Task {i} failed: {result}")
else:
print(f"✅ Task {i}: {result}")
# Вывод:
# ✅ Task 1: {'url': 'https://api.com/1', 'data': 'success'}
# ❌ Task 2 failed: Invalid URL: bad_url
# ✅ Task 3: {'url': 'https://api.com/3', 'data': 'success'}
asyncio.run(correct_example_2())3. Done callback для фоновых задач
import asyncio
from typing import Set
# Хранилище активных задач
background_tasks: Set[asyncio.Task] = set()
def handle_task_result(task: asyncio.Task) -> None:
"""
Callback для обработки результата задачи.
Вызывается автоматически при завершении задачи.
"""
background_tasks.discard(task) # Убираем из set
try:
# Получаем результат или исключение
result = task.result()
logger.info(f"✅ Task completed: {result}")
except asyncio.CancelledError:
logger.info("🚫 Task was cancelled")
except Exception as e:
# Обрабатываем исключение из задачи
logger.error(f"❌ Task failed: {e}", exc_info=True)
# ✅ ПРАВИЛЬНО: Callback обработает исключение
async def correct_example_3():
"""Фоновые задачи с автоматической обработкой ошибок."""
for i in range(5):
url = "bad_url" if i == 2 else f"https://api.com/{i}"
task = asyncio.create_task(fetch_data(url))
# Добавляем callback для обработки результата
task.add_done_callback(handle_task_result)
# Сохраняем ссылку на задачу
background_tasks.add(task)
# Ждём завершения всех задач
await asyncio.gather(*background_tasks, return_exceptions=True)
asyncio.run(correct_example_3())4. TaskGroup (Python 3.11+) — современный способ
import asyncio
import sys
if sys.version_info >= (3, 11):
async def correct_example_4():
"""
TaskGroup автоматически обрабатывает исключения.
Python 3.11+ рекомендуемый способ!
"""
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_data("https://api.com/1"))
tg.create_task(fetch_data("bad_url"))
tg.create_task(fetch_data("https://api.com/3"))
except* ValueError as eg: # Exception group
for exc in eg.exceptions:
logger.error(f"❌ Task failed: {exc}")
# asyncio.run(correct_example_4())Сравнительная таблица подходов:
| Подход | Когда использовать | Плюсы | Минусы |
|---|---|---|---|
try/await | Небольшое кол-во задач | Полный контроль | Много кода |
gather(return_exceptions=True) | Много параллельных задач | Простота | Обрабатывать после |
add_done_callback() | Фоновые задачи | Асинхронная обработка | Сложнее отладка |
TaskGroup (3.11+) | Современный код | Автоматическая очистка | Требует Python 3.11+ |
Важные правила:
- ✅ Всегда делайте
await taskили используйтеreturn_exceptions=True - ✅ Всегда обрабатывайте исключения в задачах
- ✅ Логируйте ошибки с
exc_info=Trueдля stack trace - ❌ Никогда не создавайте задачи и не забывайте про них
- ❌ Никогда не игнорируйте warning "Task exception was never retrieved"
Практический пример: надёжный обработчик задач
import asyncio
import logging
from typing import List, Callable, Any, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar('T')
async def run_tasks_safely(
tasks: List[Callable[[], Any]],
on_error: str = "continue" # "continue" | "stop" | "collect"
) -> List[Any]:
"""
Безопасный запуск задач с обработкой ошибок.
Args:
tasks: Список корутин для выполнения
on_error: Стратегия при ошибке:
- "continue": Продолжить выполнение других задач
- "stop": Остановить при первой ошибке
- "collect": Собрать все результаты и ошибки
Returns:
Список результатов (может содержать Exception при on_error="collect")
"""
task_objects = [asyncio.create_task(task()) for task in tasks]
if on_error == "continue":
# Продолжаем при ошибках
results = []
for task in task_objects:
try:
result = await task
results.append(result)
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
# Не прерываем цикл
return results
elif on_error == "stop":
# Останавливаемся при первой ошибке
results = []
for task in task_objects:
result = await task # Пробросит исключение
results.append(result)
return results
else: # "collect"
# Собираем все результаты включая ошибки
return await asyncio.gather(*task_objects, return_exceptions=True)
# Использование
async def example():
tasks = [
lambda: fetch_data("https://api.com/1"),
lambda: fetch_data("bad_url"),
lambda: fetch_data("https://api.com/3")
]
# Стратегия 1: Продолжить при ошибках
results = await run_tasks_safely(tasks, on_error="continue")
print(f"Получено результатов: {len(results)}") # 2 (одна упала)
# Стратегия 2: Собрать всё включая ошибки
all_results = await run_tasks_safely(tasks, on_error="collect")
for i, r in enumerate(all_results, 1):
if isinstance(r, Exception):
print(f"Task {i}: ERROR - {r}")
else:
print(f"Task {i}: OK - {r}")
asyncio.run(example())Отмена задач
Как работает отмена: task.cancel() НЕ останавливает задачу моментально!
Она только помечает задачу для отмены. Реальная отмена происходит при
следующем await внутри задачи, где возникает CancelledError. После
cancel() нужно сделать await task чтобы дождаться корректного завершения.
Механизм отмены задач пошагово:
import asyncio
async def long_task():
"""Задача с корректной обработкой отмены."""
print("1. Задача запустилась")
try:
print("2. Начинаю ждать 100 секунд...")
await asyncio.sleep(100) # ← Здесь возникнет CancelledError
print("3. Это НЕ выполнится если отменили")
except asyncio.CancelledError:
print("4. Получил CancelledError, выполняю cleanup...")
# Здесь можно закрыть соединения, сохранить состояние и т.д.
await asyncio.sleep(0.1) # Cleanup работа
print("5. Cleanup завершён")
raise # ⚠️ ВАЖНО! Пробрасываем дальше
print("6. Это тоже НЕ выполнится при отмене")
async def main():
task = asyncio.create_task(long_task())
# Ждём немного
await asyncio.sleep(0.5)
# ========================================
# ШАГ 1: task.cancel() — ЗАПРОС на отмену
# ========================================
print(">>> Вызываем task.cancel()")
task.cancel()
print(f">>> task.cancelled(): {task.cancelled()}") # True
# НО задача ещё работает! Она только помечена для отмены
# ========================================
# ШАГ 2: await task — ЖДЁМ реальной отмены
# ========================================
print(">>> Ждём завершения задачи через await...")
try:
await task # ← Здесь задача реально отменится
print(">>> Задача завершилась нормально")
except asyncio.CancelledError:
print(">>> Поймали CancelledError — задача отменена")
print(f">>> task.done(): {task.done()}") # True
print(f">>> task.cancelled(): {task.cancelled()}") # True
asyncio.run(main())Вывод программы:
1. Задача запустилась
2. Начинаю ждать 100 секунд...
>>> Вызываем task.cancel()
>>> task.cancelled(): True
>>> Ждём завершения задачи через await...
4. Получил CancelledError, выполняю cleanup...
5. Cleanup завершён
>>> Поймали CancelledError — задача отменена
>>> task.done(): True
>>> task.cancelled(): True
Почему нужен await после cancel()?
# ❌ НЕПРАВИЛЬНО: Не ждём завершения
async def bad_cancellation():
task = asyncio.create_task(long_task())
await asyncio.sleep(0.5)
task.cancel() # Запросили отмену
# НО НЕ ДОЖДАЛИСЬ завершения!
# Задача может:
# 1. Продолжать выполнение cleanup кода
# 2. Держать открытыми соединения
# 3. Не освободить ресурсы
# 4. Вызвать "Task was destroyed but it is pending!"
# ✅ ПРАВИЛЬНО: Ждём завершения
async def good_cancellation():
task = asyncio.create_task(long_task())
await asyncio.sleep(0.5)
task.cancel() # Запросили отмену
try:
await task # ← Дождались корректного завершения
except asyncio.CancelledError:
pass # Ожидаемо
# Теперь точно знаем что:
# 1. Cleanup код выполнился
# 2. Ресурсы освобождены
# 3. Задача полностью завершенаЧто происходит внутри задачи при cancel():
import asyncio
async def detailed_task():
"""Демонстрация поведения при отмене."""
print("Старт")
try:
# Код ДО первого await выполнится полностью
print("До await #1")
x = 10 + 20 # Выполнится
print(f"x = {x}")
# ← Первый await — ЗДЕСЬ может возникнуть CancelledError
await asyncio.sleep(1)
print("После await #1 (может не выполниться)")
await asyncio.sleep(1) # ← Или здесь
print("После await #2 (может не выполниться)")
except asyncio.CancelledError:
print("Отменено! Делаем cleanup...")
# Здесь можно безопасно использовать await
await cleanup_resources()
raise # Пробрасываем
async def cleanup_resources():
"""Cleanup может содержать await."""
print("Закрываем соединения...")
await asyncio.sleep(0.1)
print("Cleanup завершён")
async def demo():
task = asyncio.create_task(detailed_task())
await asyncio.sleep(0.05) # Даём задаче чуть-чуть выполниться
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Задача корректно отменена")
asyncio.run(demo())Вывод:
Старт
До await #1
x = 30
Отменено! Делаем cleanup...
Закрываем соединения...
Cleanup завершён
Задача корректно отменена
Важные детали:
1. Задача может проигнорировать отмену (антипаттерн!):
# ❌ ПЛОХО: Задача игнорирует отмену
async def stubborn_task():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
print("Я не хочу останавливаться!")
# НЕ пробрасываем raise — ПЛОХО!
await asyncio.sleep(10) # Продолжаем работать
return "I'm still here" # Задача НЕ отменилась
# Попытка отменить
task = asyncio.create_task(stubborn_task())
task.cancel()
result = await task # Вернёт "I'm still here", а не CancelledError!2. Множественная отмена безопасна:
task = asyncio.create_task(long_task())
task.cancel() # Первый вызов
task.cancel() # Второй вызов — ничего не сломается
task.cancel() # Третий вызов — тоже ок
await task # Дождались один раз3. Проверка состояния задачи:
task = asyncio.create_task(some_coroutine())
# До отмены
print(task.done()) # False
print(task.cancelled()) # False
# После cancel(), но ДО await
task.cancel()
print(task.done()) # False (ещё не завершилась)
print(task.cancelled()) # True (помечена для отмены)
# После await
try:
await task
except asyncio.CancelledError:
pass
print(task.done()) # True (завершена)
print(task.cancelled()) # True (отменена)Практический пример: graceful shutdown
import asyncio
import signal
from typing import Set
# Глобальное хранилище задач
background_tasks: Set[asyncio.Task] = set()
shutdown_event = asyncio.Event()
async def worker(worker_id: int):
"""Рабочая задача."""
try:
while not shutdown_event.is_set():
print(f"Worker {worker_id} working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"Worker {worker_id} cancelling...")
# Cleanup
await asyncio.sleep(0.1)
print(f"Worker {worker_id} stopped")
raise
async def main():
# Запускаем 5 workers
for i in range(5):
task = asyncio.create_task(worker(i))
background_tasks.add(task)
# Ждём сигнала завершения
await shutdown_event.wait()
# Graceful shutdown
print("Shutting down gracefully...")
# Шаг 1: Отменяем все задачи
for task in background_tasks:
task.cancel()
# Шаг 2: Ждём их завершения
results = await asyncio.gather(*background_tasks, return_exceptions=True)
# Шаг 3: Проверяем что все отменились
for i, result in enumerate(results):
if isinstance(result, asyncio.CancelledError):
print(f"Task {i} cancelled successfully")
print("All tasks stopped")
# Обработка SIGINT (Ctrl+C)
def handle_sigint():
print("\nReceived Ctrl+C")
shutdown_event.set()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_sigint)
try:
loop.run_until_complete(main())
finally:
loop.close()Резюме:
task.cancel()— запрос на отмену (не моментальная остановка)- Реальная отмена происходит при следующем
awaitвнутри задачи - После
cancel()обязательно делайтеawait task - Внутри задачи обязательно пробрасывайте
raiseвexcept CancelledError - Это позволяет задаче выполнить cleanup код перед завершением
Таймауты
async def slow_operation():
await asyncio.sleep(10)
return "result"
async def main():
try:
# Ждём максимум 2 секунды
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Операция заняла слишком много времени!")
asyncio.run(main())Ожидание нескольких операций
Выбор инструмента: - asyncio.gather() — когда нужны ВСЕ результаты в
определенном порядке - asyncio.wait() — когда нужен гибкий контроль (first
completed, timeouts, отмена) - asyncio.as_completed() — когда нужно
обрабатывать результаты по мере готовности
asyncio.gather() — собрать все результаты
async def fetch_data(id):
await asyncio.sleep(1)
return f"Data {id}"
async def main():
# Запускаем 5 операций параллельно
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3),
fetch_data(4),
fetch_data(5)
)
print(results) # ['Data 1', 'Data 2', ..., 'Data 5']
# Время: ~1 секунда, не 5!
asyncio.run(main())Обработка ошибок в gather:
async def may_fail(id):
await asyncio.sleep(1)
if id == 3:
raise ValueError("Ошибка в задаче 3")
return f"Result {id}"
async def main():
# return_exceptions=True — не прерывать на ошибке
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3), # ← Упадёт с ошибкой
may_fail(4),
return_exceptions=True # ← Вернёт Exception вместо raise
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Задача {i}: ОШИБКА - {result}")
else:
print(f"Задача {i}: {result}")
asyncio.run(main())asyncio.wait() — более гибкий контроль
async def task(id, delay):
await asyncio.sleep(delay)
return id
async def main():
tasks = [
asyncio.create_task(task(1, 1)),
asyncio.create_task(task(2, 2)),
asyncio.create_task(task(3, 3))
]
# Ждём первую завершившуюся задачу
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"Завершено: {len(done)}") # 1
print(f"Ожидают: {len(pending)}") # 2
# Отменяем оставшиеся
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed() — по мере готовности
async def download(url, delay):
await asyncio.sleep(delay)
return f"Content from {url}"
async def main():
urls = [
("url1.com", 3),
("url2.com", 1),
("url3.com", 2)
]
# Обрабатываем результаты по мере готовности
for coro in asyncio.as_completed([download(url, delay) for url, delay in urls]):
result = await coro
print(result) # Выведется в порядке: url2, url3, url1
asyncio.run(main())Асинхронные контекстные менеджеры
Управление ресурсами: Всегда используйте async with для асинхронных
ресурсов (database connections, HTTP sessions, file handles). Это гарантирует
корректное закрытие даже при исключениях и предотвращает утечки ресурсов.
Для ресурсов, которые требуют асинхронного открытия/закрытия.
import asyncio
class AsyncDatabase:
async def __aenter__(self):
print("Подключение к БД...")
await asyncio.sleep(0.5) # Имитация подключения
print("Подключено!")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Закрытие соединения...")
await asyncio.sleep(0.2)
print("Соединение закрыто")
async def query(self, sql):
await asyncio.sleep(0.1)
return f"Results for: {sql}"
async def main():
# Автоматическое управление ресурсом
async with AsyncDatabase() as db:
result = await db.query("SELECT * FROM users")
print(result)
# __aexit__ вызывается автоматически
asyncio.run(main())Реальный пример с aiohttp:
import aiohttp
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()Асинхронные итераторы
Потоковая обработка: Используйте асинхронные итераторы (async for) для
обработки данных по мере поступления — стримы, пагинация API, чтение больших
файлов. Это позволяет начать обработку до получения всех данных и снижает
потребление памяти.
Для асинхронной генерации значений.
import asyncio
class AsyncRange:
def __init__(self, start, stop):
self.current = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.5) # Асинхронная работа
value = self.current
self.current += 1
return value
async def main():
# Асинхронная итерация
async for number in AsyncRange(0, 5):
print(number) # Выводит 0, 1, 2, 3, 4 с задержкой
asyncio.run(main())Асинхронный генератор (проще):
async def async_range(stop):
for i in range(stop):
await asyncio.sleep(0.5)
yield i
async def main():
async for number in async_range(5):
print(number)
asyncio.run(main())Синхронизация
Race conditions в asyncio: Даже в однопоточном event loop возможны гонки
данных! Любой await — точка переключения контекста. Всегда используйте
примитивы синхронизации (Lock, Semaphore, Event) при работе с общим
состоянием.
Lock — взаимное исключение
import asyncio
class Counter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
# ✅ Правильно: с блокировкой
async with self.lock:
temp = self.value
await asyncio.sleep(0.01) # Имитация работы
self.value = temp + 1
async def broken_increment(self):
# ❌ Неправильно: гонка данных (race condition)
temp = self.value
await asyncio.sleep(0.01)
self.value = temp + 1
async def test():
counter = Counter()
# Запускаем 100 параллельных инкрементов
await asyncio.gather(*[counter.increment() for _ in range(100)])
print(f"С блокировкой: {counter.value}") # 100 ✓
counter.value = 0
await asyncio.gather(*[counter.broken_increment() for _ in range(100)])
print(f"Без блокировки: {counter.value}") # ~10-50 ✗
asyncio.run(test())Semaphore — ограничение параллелизма
import asyncio
# Ограничение: максимум 3 одновременных запроса
semaphore = asyncio.Semaphore(3)
async def fetch(id):
async with semaphore:
print(f"Запрос {id} начат")
await asyncio.sleep(2)
print(f"Запрос {id} завершён")
async def main():
# Запускаем 10 запросов, но одновременно выполняются только 3
await asyncio.gather(*[fetch(i) for i in range(10)])
asyncio.run(main())Вывод:
Запрос 0 начат ← Первые 3 начинаются сразу
Запрос 1 начат
Запрос 2 начат
Запрос 0 завершён ← Через 2 сек
Запрос 3 начат ← Только теперь начинается 4-й
...
Event — уведомление о событии
asyncio.Event — это примитив синхронизации, который работает как сигнальная лампочка: либо горит (событие произошло), либо не горит (событие ещё не произошло). Корутины могут ждать, пока "лампочка загорится", и все ожидающие будут разблокированы одновременно.
Как это работает?
Механизм работы:
- Создание:
event = asyncio.Event()— изначально событие не установлено (лампочка не горит) - Ожидание:
await event.wait()— корутина приостанавливается и ждёт, пока кто-то не установит событие - Установка:
event.set()— все ожидающие корутины разблокируются одновременно (broadcast) - Проверка:
event.is_set()— вернётTrueесли событие установлено,Falseесли нет - Сброс:
event.clear()— сбрасывает событие обратно (лампочка гаснет), можно ждать снова
Ключевая особенность: Это механизм "один ко многим" — один вызов set() разблокирует всех ожидающих.
Базовый пример
import asyncio
async def waiter(event, name):
"""Корутина, которая ждёт события."""
print(f"{name}: Жду события...")
await event.wait() # ← Приостанавливается здесь
print(f"{name}: Событие произошло!")
async def setter(event):
"""Корутина, которая устанавливает событие."""
await asyncio.sleep(2) # Ждём 2 секунды
print("🔔 Отправляю событие...")
event.set() # ← Разблокирует ВСЕХ ожидающих одновременно
async def main():
event = asyncio.Event()
# Запускаем 3 ожидающих + 1 отправителя
await asyncio.gather(
waiter(event, "Задача 1"),
waiter(event, "Задача 2"),
waiter(event, "Задача 3"),
setter(event)
)
asyncio.run(main())Вывод:
Задача 1: Жду события...
Задача 2: Жду события...
Задача 3: Жду события...
🔔 Отправляю событие...
Задача 1: Событие произошло! ← Все разблокировались
Задача 2: Событие произошло! ← одновременно
Задача 3: Событие произошло!
Практические сценарии использования
1. Graceful shutdown (корректное завершение приложения)
import asyncio
import signal
shutdown_event = asyncio.Event()
async def worker(worker_id: int):
"""Рабочая задача, которая останавливается по событию."""
while not shutdown_event.is_set():
print(f"Worker {worker_id} работает...")
await asyncio.sleep(1)
print(f"Worker {worker_id} завершён")
async def main():
# Запускаем 5 workers
workers = [asyncio.create_task(worker(i)) for i in range(5)]
# Ждём сигнала завершения (например, Ctrl+C)
await shutdown_event.wait()
# Все workers автоматически завершатся
await asyncio.gather(*workers)
# В production обрабатываем сигналы:
# signal.signal(signal.SIGINT, lambda s, f: shutdown_event.set())2. Ожидание инициализации сервиса
import asyncio
db_ready = asyncio.Event()
async def database_connection():
"""Подключение к БД (долгая операция)."""
print("Подключаюсь к БД...")
await asyncio.sleep(3) # Имитация подключения
print("✅ БД готова!")
db_ready.set() # Сигнализируем что готовы
async def api_handler(request_id: int):
"""API handler, который ждёт готовности БД."""
print(f"Запрос {request_id}: Ожидаю готовность БД...")
# Ждём пока БД инициализируется
await db_ready.wait()
print(f"Запрос {request_id}: Обрабатываю...")
await asyncio.sleep(0.5)
return f"Response {request_id}"
async def main():
# Запускаем всё одновременно
await asyncio.gather(
database_connection(),
api_handler(1),
api_handler(2),
api_handler(3)
)
asyncio.run(main())Вывод:
Подключаюсь к БД...
Запрос 1: Ожидаю готовность БД...
Запрос 2: Ожидаю готовность БД...
Запрос 3: Ожидаю готовность БД...
✅ БД готова!
Запрос 1: Обрабатываю...
Запрос 2: Обрабатываю...
Запрос 3: Обрабатываю...
3. Переиспользование Event (clear + set)
import asyncio
async def reusable_event_example():
"""Пример переиспользования события."""
event = asyncio.Event()
async def waiter():
for i in range(3):
print(f" Раунд {i+1}: Жду...")
await event.wait()
print(f" Раунд {i+1}: Получил!")
# После wait() НЕ нужно делать clear() вручную
async def controller():
for i in range(3):
await asyncio.sleep(1)
print(f"🔔 Отправляю сигнал {i+1}")
event.set()
await asyncio.sleep(0.1) # Даём обработать
event.clear() # ← Сбрасываем для повторного использования
await asyncio.gather(waiter(), controller())
asyncio.run(reusable_event_example())Вывод:
Раунд 1: Жду...
🔔 Отправляю сигнал 1
Раунд 1: Получил!
Раунд 2: Жду...
🔔 Отправляю сигнал 2
Раунд 2: Получил!
Раунд 3: Жду...
🔔 Отправляю сигнал 3
Раунд 3: Получил!
Важные детали и подводные камни
1. Event.wait() не сбрасывает событие автоматически
event = asyncio.Event()
event.set()
# Все эти wait() вернутся НЕМЕДЛЕННО
await event.wait() # Не блокируется
await event.wait() # Не блокируется
await event.wait() # Не блокируется
# Чтобы ждать снова, нужно сбросить:
event.clear()
await event.wait() # Теперь блокируется2. Порядок разблокировки не гарантирован
# Кто первый вызвал wait() — не обязательно первый получит управление
# После set() порядок выполнения зависит от event loop scheduler3. Event vs Flag (булева переменная)
# ❌ НЕПРАВИЛЬНО: race condition
class BrokenService:
def __init__(self):
self.ready = False # Обычная переменная
async def wait_ready(self):
while not self.ready: # Активное ожидание!
await asyncio.sleep(0.01) # Жжём CPU
# ✅ ПРАВИЛЬНО: используем Event
class CorrectService:
def __init__(self):
self.ready = asyncio.Event()
async def wait_ready(self):
await self.ready.wait() # Эффективное ожиданиеСравнение с другими примитивами
| Примитив | Разблокирует | Использование |
|---|---|---|
Event | Всех сразу | Broadcast сигнал (shutdown, init) |
Lock | Одного | Взаимное исключение (mutex) |
Semaphore | N задач | Ограничение параллелизма |
Condition | Выборочно | Сложная координация |
Резюме:
Event= сигнальная лампочка для координации множества корутинset()= включить лампочку (разблокировать всех)wait()= ждать пока лампочка загоритсяclear()= выключить лампочку (для переиспользования)is_set()= проверить состояние без ожидания
Используйте Event когда одно событие должно разблокировать много ожидающих корутин одновременно.
Очереди
Producer-Consumer паттерн: asyncio.Queue — thread-safe структура данных
для передачи данных между корутинами. Автоматически управляет блокировками и
backpressure через maxsize. Используйте для координации между
производителями и потребителями.
asyncio.Queue — это асинхронная очередь для безопасной передачи данных между корутинами. Она решает классическую задачу Producer-Consumer (производитель-потребитель): одни корутины создают задачи и добавляют их в очередь, другие корутины берут задачи из очереди и обрабатывают.
Как это работает?
Основные операции:
-
await queue.put(item)— добавить элемент в очередь- Если очередь полная (достигнут
maxsize),put()блокируется до освобождения места - Это механизм backpressure — производители не перегружают систему
- Если очередь полная (достигнут
-
await queue.get()— взять элемент из очереди- Если очередь пустая,
get()блокируется до появления элемента - Возвращает первый элемент (FIFO — First In First Out)
- Если очередь пустая,
-
queue.task_done()— сообщить что элемент обработан- Уменьшает внутренний счётчик незавершённых задач
- Нужно вызывать после обработки каждого
get()
-
await queue.join()— ждать пока все элементы обработаны- Блокируется пока счётчик не станет 0 (все
task_done()вызваны)
- Блокируется пока счётчик не станет 0 (все
Дополнительные методы:
queue.qsize()— текущий размер очереди (может быть неточным)queue.empty()— проверка что очередь пустаqueue.full()— проверка что очередь полная
Базовый пример: Producer-Consumer
import asyncio
import random
async def producer(queue, id):
"""Производитель — добавляет элементы в очередь."""
for i in range(5):
item = f"P{id}-Item{i}"
await queue.put(item)
print(f"📦 Производитель {id} создал: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
print(f"✅ Производитель {id} завершил работу")
async def consumer(queue, id):
"""Потребитель — обрабатывает элементы из очереди."""
while True:
item = await queue.get()
if item is None: # Сигнал завершения
queue.task_done()
break
print(f" ⚙️ Потребитель {id} обрабатывает: {item}")
await asyncio.sleep(random.uniform(0.2, 0.4))
queue.task_done() # ← Обязательно вызываем!
print(f" ✓ Потребитель {id} завершил: {item}")
async def main():
# Очередь размером 10 элементов (maxsize ограничивает переполнение)
queue = asyncio.Queue(maxsize=10)
# Запускаем 2 производителей
producers = [
asyncio.create_task(producer(queue, i))
for i in range(2)
]
# Запускаем 3 потребителя
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Шаг 1: Ждём пока производители закончат создавать задачи
await asyncio.gather(*producers)
print("\n🏁 Все производители завершены\n")
# Шаг 2: Ждём пока все задачи из очереди будут обработаны
await queue.join()
print("🏁 Все задачи обработаны\n")
# Шаг 3: Отправляем сигналы завершения потребителям
for _ in consumers:
await queue.put(None)
# Шаг 4: Ждём завершения потребителей
await asyncio.gather(*consumers)
print("🏁 Все потребители завершены")
asyncio.run(main())Пример вывода:
📦 Производитель 0 создал: P0-Item0
📦 Производитель 1 создал: P1-Item0
⚙️ Потребитель 0 обрабатывает: P0-Item0
⚙️ Потребитель 1 обрабатывает: P1-Item0
📦 Производитель 0 создал: P0-Item1
⚙️ Потребитель 2 обрабатывает: P0-Item1
✓ Потребитель 0 завершил: P0-Item0
📦 Производитель 1 создал: P1-Item1
⚙️ Потребитель 0 обрабатывает: P1-Item1
✓ Потребитель 1 завершил: P1-Item0
📦 Производитель 0 создал: P0-Item2
⚙️ Потребитель 1 обрабатывает: P0-Item2
✓ Потребитель 2 завершил: P0-Item1
📦 Производитель 1 создал: P1-Item2
⚙️ Потребитель 2 обрабатывает: P1-Item2
[... ещё несколько строк ...]
✅ Производитель 0 завершил работу
✅ Производитель 1 завершил работу
🏁 Все производители завершены
✓ Потребитель 0 завершил: P1-Item4
✓ Потребитель 1 завершил: P0-Item4
✓ Потребитель 2 завершил: P1-Item3
🏁 Все задачи обработаны
🏁 Все потребители завершены
Что происходит:
- Параллельная работа: Производители создают задачи, потребители сразу начинают их обрабатывать
- Балансировка нагрузки: Три потребителя распределяют работу между собой автоматически
- Graceful shutdown:
- Ждём пока производители создадут все задачи
- Ждём пока потребители обработают всё (
queue.join()) - Отправляем сигнал завершения (
None) - Корректно завершаем потребителей
Практические сценарии
1. Web Scraping с ограничением параллелизма
Задача: Загрузить 1000 страниц, но не перегрузить сервер (не более 3 одновременных запросов).
Решение: Используем очередь как буфер между производителем URL и workers, которые их загружают.
Как это работает:
- Производитель добавляет все URL в очередь
- 3 workers берут URL из очереди и загружают параллельно
- Очередь с
maxsize=10ограничивает буфер (backpressure) - Когда worker загрузил страницу, берёт следующий URL из очереди
Преимущества:
- Автоматическая балансировка нагрузки между workers
- Ограничение параллелизма (не больше 3 запросов одновременно)
- Обработка ошибок не останавливает других workers
Пример кода:
import asyncio
import aiohttp
async def url_producer(queue, urls):
"""Производитель: добавляет URL в очередь."""
for url in urls:
await queue.put(url)
print(f"➕ Добавлен URL: {url}")
# Сигналы завершения для всех workers
for _ in range(3): # 3 workers
await queue.put(None)
async def scraper_worker(queue, worker_id, session):
"""Потребитель: скачивает страницы."""
while True:
url = await queue.get()
if url is None:
queue.task_done()
break
try:
print(f"🌐 Worker {worker_id}: загружаю {url}")
async with session.get(url, timeout=5) as response:
html = await response.text()
print(f"✅ Worker {worker_id}: загружено {len(html)} байт")
except Exception as e:
print(f"❌ Worker {worker_id}: ошибка {url} - {e}")
finally:
queue.task_done()
async def scrape_urls(urls):
"""Главная функция: координирует scraping."""
queue = asyncio.Queue(maxsize=10) # Буфер на 10 URL
async with aiohttp.ClientSession() as session:
# Запускаем producer
producer_task = asyncio.create_task(url_producer(queue, urls))
# Запускаем 3 workers
workers = [
asyncio.create_task(scraper_worker(queue, i, session))
for i in range(3)
]
# Ждём завершения
await producer_task
await queue.join()
await asyncio.gather(*workers)
# urls = ["https://example.com/page1", ...]
# asyncio.run(scrape_urls(urls))Пример вывода:
➕ Добавлен URL: https://example.com/page1
➕ Добавлен URL: https://example.com/page2
➕ Добавлен URL: https://example.com/page3
🌐 Worker 0: загружаю https://example.com/page1
🌐 Worker 1: загружаю https://example.com/page2
🌐 Worker 2: загружаю https://example.com/page3
✅ Worker 0: загружено 15234 байт
🌐 Worker 0: загружаю https://example.com/page4
...
2. Batch Processing (пакетная обработка)
Задача: Отправить 1000 писем, но API принимает только пакеты по 5 писем.
Решение: Накапливаем элементы в буфер, когда набралось 5 — обрабатываем пакет.
Как это работает:
- Производитель добавляет элементы в очередь по одному
- Потребитель накапливает их в список
- Когда в списке 5 элементов → отправляем пакет в API
- При завершении обрабатываем остатки (если меньше 5)
Когда использовать:
- Отправка данных в API с пакетными эндпоинтами
- Bulk операции с БД (INSERT 100 строк за раз)
- Оптимизация сетевых запросов
import asyncio
async def batch_processor(queue):
"""Обрабатывает элементы пакетами по 5 штук."""
batch = []
while True:
item = await queue.get()
if item is None: # Завершение
# Обрабатываем остатки
if batch:
await process_batch(batch)
queue.task_done()
break
batch.append(item)
# Когда накопили 5 элементов — обрабатываем пакет
if len(batch) == 5:
await process_batch(batch)
batch = []
queue.task_done()
async def process_batch(batch):
"""Обработка пакета элементов."""
print(f"📦 Обрабатываю пакет из {len(batch)} элементов: {batch}")
await asyncio.sleep(1) # Имитация обработки
print(f"✅ Пакет обработан")3. Очередь с приоритетами (Priority Queue)
Задача: Обрабатывать задачи в порядке важности, а не в порядке поступления.
Проблема обычной очереди:
# Обычная очередь (FIFO)
queue.put("Низкий приоритет") # Обработается первым
queue.put("КРИТИЧНО!") # Обработается вторым ❌Решение: PriorityQueue — извлекает элементы в порядке приоритета, а не в порядке добавления.
Как это работает:
- Каждый элемент имеет приоритет (число)
- Меньшее число = выше приоритет (1 важнее 5)
queue.get()всегда возвращает элемент с наименьшим приоритетом- Если приоритеты равны — порядок добавления (FIFO)
Зачем нужен @dataclass(order=True)?
# Без dataclass — нужно сравнивать вручную
item1 = (1, "Задача A") # (приоритет, данные)
item2 = (2, "Задача B")
# Но если приоритеты равны, Python попытается сравнить строки!
item1 = (1, {"data": "A"}) # (приоритет, dict)
item2 = (1, {"data": "B"})
# ❌ TypeError: '<' not supported between dict
# С dataclass — безопасное сравнение
@dataclass(order=True)
class PrioritizedItem:
priority: int # Используется для сравнения
item: Any = field(compare=False) # НЕ участвует в сравненииПростой пример:
import asyncio
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedItem:
"""Элемент с приоритетом для очереди."""
priority: int # Сравнивается (меньше = важнее)
item: Any = field(compare=False) # НЕ сравнивается
async def priority_example():
"""Пример использования приоритетной очереди."""
queue = asyncio.PriorityQueue()
# Добавляем задачи в случайном порядке
await queue.put(PrioritizedItem(priority=5, item="Низкий приоритет"))
await queue.put(PrioritizedItem(priority=1, item="КРИТИЧНО!"))
await queue.put(PrioritizedItem(priority=3, item="Средний приоритет"))
await queue.put(PrioritizedItem(priority=1, item="Тоже критично!"))
print("Порядок обработки:\n")
# Обрабатываем в порядке приоритета (автоматически!)
while not queue.empty():
item = await queue.get()
print(f" [Приоритет {item.priority}] {item.item}")
await asyncio.sleep(0.5) # Имитация обработки
queue.task_done()
asyncio.run(priority_example())Вывод:
Порядок обработки:
[Приоритет 1] КРИТИЧНО! ← Обработалось первым (priority=1)
[Приоритет 1] Тоже критично! ← Второе с priority=1 (FIFO внутри приоритета)
[Приоритет 3] Средний приоритет ← Третье (priority=3)
[Приоритет 5] Низкий приоритет ← Последнее (priority=5)
Когда использовать Priority Queue:
- ✅ Обработка заявок/тикетов с разным приоритетом
- ✅ Task scheduling (планировщик задач)
- ✅ Event processing (критичные события обрабатываются первыми)
- ✅ Rate limiting с приоритетами (VIP пользователи в приоритете)
- ❌ НЕ нужно если все задачи равнозначны (используйте обычную Queue)
Типы очередей
# 1. FIFO (First In First Out) — обычная очередь
queue = asyncio.Queue(maxsize=10)
# 2. LIFO (Last In First Out) — стек
stack = asyncio.LifoQueue(maxsize=10)
await stack.put("first")
await stack.put("second")
await stack.put("third")
print(await stack.get()) # "third" (последний добавленный)
# 3. Приоритетная очередь
priority_queue = asyncio.PriorityQueue()
await priority_queue.put((1, "низкий приоритет"))
await priority_queue.put((0, "высокий приоритет"))
item = await priority_queue.get() # (0, "высокий приоритет")Важные детали
1. Обязательно вызывайте task_done()
# ❌ НЕПРАВИЛЬНО: забыли task_done()
async def bad_consumer(queue):
while True:
item = await queue.get()
if item is None:
break
process(item)
# Забыли queue.task_done()!
# await queue.join() будет висеть вечно!
# ✅ ПРАВИЛЬНО
async def good_consumer(queue):
while True:
item = await queue.get()
if item is None:
queue.task_done() # Для None тоже!
break
process(item)
queue.task_done() # Обязательно2. Backpressure через maxsize
# Если maxsize=5, а производитель пытается добавить 6-й элемент:
queue = asyncio.Queue(maxsize=5)
for i in range(10):
await queue.put(i) # ← На 6-м элементе БЛОКИРУЕТСЯ
# Ждёт пока потребитель не освободит место3. Graceful shutdown pattern
async def graceful_shutdown():
"""Правильное завершение producer-consumer."""
queue = asyncio.Queue()
# Запускаем N consumers
num_consumers = 3
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(num_consumers)
]
# Производим данные
await producer(queue)
# Отправляем N сигналов завершения (None)
for _ in range(num_consumers):
await queue.put(None)
# Ждём завершения всех consumers
await asyncio.gather(*consumers)Резюме:
asyncio.Queue— безопасная передача данных между корутинами- Автоматическая блокировка на
put()(переполнение) иget()(пустая очередь) - Обязательно вызывайте
task_done()после обработки - Используйте
queue.join()для ожидания завершения всех задач - Три типа:
Queue(FIFO),LifoQueue(LIFO),PriorityQueue
Работа с сетью
Работа с сетью — одна из главных причин использовать asyncio. Асинхронные HTTP-запросы и запросы к базам данных позволяют обрабатывать тысячи одновременных подключений на одном потоке, потому что пока мы ждём ответа от сервера, event loop переключается на другие задачи.
Production best practices:
- Переиспользуйте
aiohttp.ClientSession— создавайте один раз на приложение, не создавайте на каждый запрос (утечка соединений!) - ВСЕГДА устанавливайте таймауты — используйте
ClientTimeout, иначе запрос может висеть вечно - Connection pooling для БД —
asyncpg.create_pool()переиспользует соединения вместо создания новых каждый раз - Обрабатывайте ошибки — сетевые запросы падают часто, используйте
return_exceptions=Trueвgather()
Асинхронные HTTP-запросы (aiohttp)
aiohttp — это асинхронная библиотека для работы с HTTP. В отличие от requests (блокирующая), aiohttp полностью асинхронна и интегрируется с asyncio.
Почему не requests в async коде?
# ❌ ПЛОХО: requests блокирует event loop
async def bad_example():
import requests
response = requests.get("https://api.com") # Блокирует!
# Пока ждём ответ, event loop ЗАМОРАЖИВАЕТСЯ
# Никакие другие корутины не выполняются!
# ✅ ХОРОШО: aiohttp не блокирует event loop
async def good_example():
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://api.com") as response:
# Пока ждём ответ, event loop переключается на другие задачи
data = await response.text()Ключевые концепции aiohttp:
-
ClientSession — менеджер соединений (держит pool соединений, cookies, headers)
- Создавайте один раз на приложение
- НЕ создавайте на каждый запрос (утечка соединений!)
- Закрывается автоматически через
async with
-
Таймауты — ОБЯЗАТЕЛЬНО устанавливайте, иначе запрос может висеть бесконечно
ClientTimeout(total=5)— общий таймаут на запросClientTimeout(connect=1, sock_read=5)— раздельные таймауты
-
Обработка ошибок — сеть ненадёжна, всегда обрабатывайте исключения
Базовый пример: параллельная загрузка URL
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Загрузка одного URL."""
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
"""Параллельная загрузка нескольких URL."""
# Создаём одну сессию для всех запросов
async with aiohttp.ClientSession() as session:
# Создаём задачи для параллельной загрузки
tasks = [fetch_url(session, url) for url in urls]
# Выполняем все запросы параллельно
# return_exceptions=True — не падаем на первой ошибке
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
'https://httpbin.org/delay/1', # Ответит через 1 сек
'https://httpbin.org/delay/2', # Ответит через 2 сек
'https://httpbin.org/delay/1' # Ответит через 1 сек
]
# Общее время выполнения: ~2 секунды (не 1+2+1=4!)
results = await fetch_all(urls)
print(f"Загружено {len(results)} страниц")
asyncio.run(main())Что происходит:
- Создаём
ClientSessionодин раз - Отправляем все 3 запроса одновременно
- Пока ждём ответов, event loop переключается между запросами
- Общее время = время самого медленного запроса (2 сек), не сумма всех (4 сек)
Production-ready пример с обработкой ошибок
import asyncio
import aiohttp
from typing import Optional, Dict, Any
async def fetch_safe(
session: aiohttp.ClientSession,
url: str,
timeout: int = 5
) -> Optional[Dict[str, Any]]:
"""
Безопасная загрузка URL с обработкой всех возможных ошибок.
Returns:
JSON данные или None при ошибке
"""
try:
# Устанавливаем таймаут
timeout_config = aiohttp.ClientTimeout(total=timeout)
async with session.get(url, timeout=timeout_config) as response:
# Проверяем статус код (raise если 4xx/5xx)
response.raise_for_status()
# Парсим JSON
data = await response.json()
print(f"✅ {url}: {response.status}")
return data
except aiohttp.ClientError as e:
# Ошибки сети: ConnectionError, InvalidURL, etc.
print(f"❌ Ошибка сети {url}: {e}")
return None
except asyncio.TimeoutError:
# Таймаут
print(f"⏱️ Таймаут {url}")
return None
except Exception as e:
# Неожиданные ошибки
print(f"💥 Неожиданная ошибка {url}: {e}")
return None
async def main():
"""Пример использования с обработкой ошибок."""
urls = [
'https://httpbin.org/json', # Успешно
'https://httpbin.org/status/500', # 500 ошибка
'https://invalid-url-xyz.com', # Несуществующий домен
'https://httpbin.org/delay/10', # Таймаут (> 5 сек)
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_safe(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# Фильтруем успешные результаты
successful = [r for r in results if r is not None]
print(f"\n📊 Успешно загружено: {len(successful)}/{len(urls)}")
asyncio.run(main())Вывод:
✅ https://httpbin.org/json: 200
❌ Ошибка сети https://httpbin.org/status/500: 500, message='Internal Server Error'
❌ Ошибка сети https://invalid-url-xyz.com: Cannot connect to host
⏱️ Таймаут https://httpbin.org/delay/10
📊 Успешно загружено: 1/4
Почему переиспользовать ClientSession?
# ❌ ПЛОХО: создаём session на каждый запрос
async def bad_approach():
for url in urls:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(await response.text())
# Проблемы:
# 1. Новый TCP handshake на каждый запрос (медленно)
# 2. Нет переиспользования соединений
# 3. Нет connection pooling
# 4. Утечка памяти при большом количестве запросов
# ✅ ХОРОШО: одна session на всё приложение
async def good_approach():
async with aiohttp.ClientSession() as session:
for url in urls:
async with session.get(url) as response:
print(await response.text())
# Преимущества:
# 1. Переиспользование TCP соединений (HTTP keep-alive)
# 2. Connection pooling (до 100 соединений по умолчанию)
# 3. Автоматическое управление cookies
# 4. Меньше overhead на создание/удаление сессийРабота с базой данных (asyncpg)
asyncpg — это самый быстрый драйвер PostgreSQL для Python, полностью асинхронный. Он в 3-5 раз быстрее чем psycopg2 (синхронный драйвер) благодаря нативной интеграции с asyncio и оптимизированному протоколу.
Почему asyncpg в async коде?
# ❌ ПЛОХО: psycopg2 блокирует event loop
async def bad_example():
import psycopg2
conn = psycopg2.connect("...") # Блокирует!
cursor = conn.cursor()
cursor.execute("SELECT * FROM users") # Блокирует!
# Пока ждём БД, весь event loop стоит
# ✅ ХОРОШО: asyncpg не блокирует event loop
async def good_example():
import asyncpg
conn = await asyncpg.connect("...") # Не блокирует
rows = await conn.fetch("SELECT * FROM users") # Не блокирует
# Пока ждём БД, event loop обрабатывает другие запросыБазовый пример: простой запрос
import asyncio
import asyncpg
async def fetch_users():
"""Простой запрос к PostgreSQL."""
# Подключение к БД
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
# Выполнение SELECT запроса
rows = await conn.fetch('SELECT id, name FROM users LIMIT 10')
# Обработка результатов
for row in rows:
# row — это dict-like объект
print(f"User {row['id']}: {row['name']}")
# Вставка данных с параметрами ($1, $2 — защита от SQL injection)
await conn.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
'John Doe',
'john@example.com'
)
print("✅ Данные вставлены")
finally:
# ОБЯЗАТЕЛЬНО закрываем соединение
await conn.close()
asyncio.run(fetch_users())Методы выполнения запросов:
conn.fetch(query, *args)— возвращает список строк (SELECT)conn.fetchrow(query, *args)— возвращает одну строку (SELECT ... LIMIT 1)conn.fetchval(query, *args)— возвращает одно значение (SELECT COUNT(*))conn.execute(query, *args)— выполняет запрос без возврата данных (INSERT/UPDATE/DELETE)
Connection Pool — переиспользование соединений
Создание нового подключения к БД дорого (TCP handshake, аутентификация, инициализация). Connection Pool держит пул открытых соединений и переиспользует их.
import asyncio
import asyncpg
async def create_pool():
"""Создание connection pool."""
return await asyncpg.create_pool(
user='user',
password='password',
database='mydb',
host='localhost',
min_size=10, # Минимум 10 соединений всегда открыты
max_size=20, # Максимум 20 соединений
command_timeout=5 # Таймаут на выполнение команды
)
async def query_with_pool(pool):
"""Использование pool для запросов."""
# Берём соединение из pool
async with pool.acquire() as connection:
# Используем соединение
result = await connection.fetchrow('SELECT NOW()')
print(f"Текущее время в БД: {result['now']}")
# Соединение автоматически вернётся в pool после `async with`
async def main():
"""Production-ready пример с pool."""
# Создаём pool один раз при старте приложения
pool = await create_pool()
try:
# Выполняем множество параллельных запросов
tasks = [query_with_pool(pool) for _ in range(100)]
await asyncio.gather(*tasks)
# Pool автоматически распределит 100 запросов между 20 соединениями
finally:
# Закрываем pool при завершении приложения
await pool.close()
asyncio.run(main())Преимущества Connection Pool:
- Производительность — не создаём новое соединение на каждый запрос
- Ограничение нагрузки — максимум
max_sizeодновременных соединений к БД - Автоматическое переподключение — при разрыве соединения pool пересоздаёт его
- Минимальная задержка —
min_sizeсоединений всегда готовы к использованию
Сравнение производительности
import asyncio
import asyncpg
import time
# ❌ Без pool: создаём новое соединение каждый раз
async def without_pool():
start = time.time()
for i in range(100):
conn = await asyncpg.connect("postgresql://...") # Медленно!
await conn.fetchval("SELECT 1")
await conn.close()
print(f"Без pool: {time.time() - start:.2f} сек") # ~15-20 секунд
# ✅ С pool: переиспользуем соединения
async def with_pool():
start = time.time()
pool = await asyncpg.create_pool("postgresql://...")
async def query():
async with pool.acquire() as conn:
await conn.fetchval("SELECT 1")
await asyncio.gather(*[query() for _ in range(100)])
await pool.close()
print(f"С pool: {time.time() - start:.2f} сек") # ~0.5-1 секунда!Резюме по работе с сетью:
- aiohttp — для асинхронных HTTP-запросов (НЕ используйте requests в async!)
- asyncpg — для асинхронной работы с PostgreSQL (в 3-5 раз быстрее psycopg2)
- Переиспользуйте ресурсы:
ClientSessionдля HTTP, connection pool для БД - Всегда устанавливайте таймауты — сеть ненадёжна
- Обрабатывайте ошибки — используйте
try/exceptиreturn_exceptions=True
Практические примеры
Итеративный подход: Начинайте с простого рабочего примера, профилируйте узкие места, затем масштабируйте. Asyncio эффективен при обработке множества параллельных задач, а не при одной длинной операции.
Пример 1: Асинхронный веб-скрапер
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def scrape_page(session, url):
"""Парсинг одной страницы"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
title_text = title.text.strip() if title else "No title"
# Находим все ссылки
links = [a['href'] for a in soup.find_all('a', href=True)]
return {
'url': url,
'title': title_text,
'links_count': len(links)
}
except Exception as e:
return {'url': url, 'error': str(e)}
async def scrape_multiple(urls):
"""Параллельный парсинг нескольких страниц"""
async with aiohttp.ClientSession() as session:
tasks = [scrape_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
'https://example.com',
'https://python.org',
'https://github.com'
]
results = await scrape_multiple(urls)
for result in results:
if 'error' in result:
print(f"❌ {result['url']}: {result['error']}")
else:
print(f"✅ {result['url']}")
print(f" Заголовок: {result['title']}")
print(f" Ссылок: {result['links_count']}")
# asyncio.run(main())Пример 2: Rate Limiter (ограничение частоты запросов)
import asyncio
import time
class RateLimiter:
"""Ограничитель: max_calls запросов за period секунд"""
def __init__(self, max_calls, period):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
await self.semaphore.acquire()
# Удаляем старые вызовы
now = time.time()
self.calls = [call_time for call_time in self.calls
if now - call_time < self.period]
# Если достигли лимита, ждём
if len(self.calls) >= self.max_calls:
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(time.time())
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
# Использование
limiter = RateLimiter(max_calls=5, period=1.0) # 5 запросов в секунду
async def api_call(id):
async with limiter:
print(f"Запрос {id} в {time.time():.1f}")
await asyncio.sleep(0.1)
async def main():
# Пытаемся сделать 20 запросов
await asyncio.gather(*[api_call(i) for i in range(20)])
# Будет выполняться со скоростью 5 запросов/сек
# asyncio.run(main())Пример 3: Параллельная обработка файлов
import asyncio
import aiofiles
from pathlib import Path
async def process_file(file_path):
"""Асинхронное чтение и обработка файла"""
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
# Обработка
lines = content.split('\n')
words = content.split()
return {
'file': file_path.name,
'lines': len(lines),
'words': len(words),
'chars': len(content)
}
except Exception as e:
return {'file': file_path.name, 'error': str(e)}
async def process_directory(directory):
"""Обработка всех .txt файлов в директории"""
path = Path(directory)
txt_files = list(path.glob('*.txt'))
if not txt_files:
print("Файлы не найдены")
return []
print(f"Найдено {len(txt_files)} файлов")
# Обрабатываем параллельно
tasks = [process_file(file) for file in txt_files]
results = await asyncio.gather(*tasks)
return results
async def main():
results = await process_directory('.')
for result in results:
if 'error' in result:
print(f"❌ {result['file']}: {result['error']}")
else:
print(f"✅ {result['file']}: "
f"{result['lines']} строк, "
f"{result['words']} слов, "
f"{result['chars']} символов")
# asyncio.run(main())Распространённые ошибки и их решения
Топ-3 ошибки в production: 1. Забытый await — возвращает coroutine
вместо результата 2. Блокирующие операции (time.sleep, синхронные I/O) —
замораживают event loop 3. Множественный asyncio.run() — RuntimeError в
запущенном loop
Ошибка 1: Забытый await
# ❌ НЕПРАВИЛЬНО
async def wrong():
result = some_async_function() # Вернёт coroutine объект!
print(result) # <coroutine object ...>
# ✅ ПРАВИЛЬНО
async def correct():
result = await some_async_function() # Ждём результат
print(result) # Реальный результатКак понять: Если видите warning coroutine was never awaited — забыли await.
Ошибка 2: Блокирующие операции в async
import time
# ❌ НЕПРАВИЛЬНО: блокирует event loop
async def wrong():
time.sleep(5) # Останавливает ВСЁ на 5 секунд!
# ✅ ПРАВИЛЬНО: не блокирует event loop
async def correct():
await asyncio.sleep(5) # Освобождает event loopПравило: В асинхронном коде используйте только асинхронные операции.
Для блокирующих операций используйте run_in_executor:
async def process_cpu_intensive():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, # Default ThreadPoolExecutor
cpu_intensive_function,
arg1, arg2
)
return resultОшибка 3: Множественный asyncio.run()
# ❌ НЕПРАВИЛЬНО
async def wrong():
asyncio.run(some_coroutine()) # Event loop уже запущен!
# ✅ ПРАВИЛЬНО
async def correct():
await some_coroutine() # Просто await
# asyncio.run() только в точке входа
asyncio.run(correct())Ошибка 4: Незахваченные исключения в задачах
async def risky_task():
raise ValueError("Ошибка!")
# ❌ НЕПРАВИЛЬНО: исключение может быть проигнорировано
async def wrong():
task = asyncio.create_task(risky_task())
# Если не делаем await, исключение теряется!
# ✅ ПРАВИЛЬНО: обрабатываем исключение
async def correct():
task = asyncio.create_task(risky_task())
try:
await task
except ValueError as e:
print(f"Поймана ошибка: {e}")
asyncio.run(correct())Ошибка 5: Гонка данных (race condition)
# ❌ НЕПРАВИЛЬНО
class WrongCounter:
def __init__(self):
self.count = 0
async def increment(self):
temp = self.count
await asyncio.sleep(0.001) # Другие корутины могут вмешаться!
self.count = temp + 1
# ✅ ПРАВИЛЬНО: используем Lock
class CorrectCounter:
def __init__(self):
self.count = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
temp = self.count
await asyncio.sleep(0.001)
self.count = temp + 1Ошибка 6: Неправильный порядок в gather
# gather сохраняет порядок!
results = await asyncio.gather(
slow_task(), # 5 секунд
fast_task(), # 1 секунда
medium_task() # 3 секунды
)
# results[0] — результат slow_task
# results[1] — результат fast_task
# results[2] — результат medium_task
# Порядок соответствует порядку вызова, не времени выполнения!Чек-лист: когда использовать asyncio
Принцип выбора: Asyncio эффективен, когда программа большую часть времени
ожидает (I/O-bound). Для CPU-интенсивных задач используйте multiprocessing
или ProcessPoolExecutor. Для смешанных задач комбинируйте подходы.
✅ Используйте asyncio когда:
- Много I/O операций (сеть, файлы, БД)
- Нужно обрабатывать тысячи одновременных подключений
- Веб-скрейпинг, API запросы
- WebSocket серверы
- Чат-боты, real-time приложения
❌ НЕ используйте asyncio когда:
- CPU-интенсивные вычисления → используйте
multiprocessing - Простой скрипт с 1-2 запросами → синхронный код проще
- Библиотека не поддерживает asyncio → придётся блокировать event loop
Полезные паттерны
Production-ready паттерны: Эти шаблоны проверены в боевых условиях и решают реальные проблемы — сетевые сбои, перегрузки, graceful shutdown. Внедряйте их сразу, не дожидаясь проблем.
Паттерн 1: Retry с экспоненциальной задержкой
async def retry_with_backoff(coro, max_retries=3):
"""Повторная попытка с увеличивающейся задержкой"""
for attempt in range(max_retries):
try:
return await coro
except Exception as e:
if attempt == max_retries - 1:
raise
delay = 2 ** attempt # 1, 2, 4, 8...
print(f"Ошибка {e}, повтор через {delay} сек")
await asyncio.sleep(delay)Паттерн 2: Circuit Breaker
class CircuitBreaker:
"""Прерыватель: отключает сервис после N ошибок"""
def __init__(self, max_failures=5, timeout=60):
self.max_failures = max_failures
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, coro):
if self.state == "OPEN":
# Проверяем, можно ли попробовать снова
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await coro
self.failures = 0
self.state = "CLOSED"
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.max_failures:
self.state = "OPEN"
raiseПаттерн 3: Graceful Shutdown
async def graceful_shutdown(tasks, timeout=10):
"""Корректное завершение задач"""
# Даём задачам время завершиться
done, pending = await asyncio.wait(
tasks,
timeout=timeout,
return_when=asyncio.ALL_COMPLETED
)
# Отменяем незавершённые
for task in pending:
task.cancel()
# Ждём отмены
await asyncio.gather(*pending, return_exceptions=True)
print(f"Завершено: {len(done)}, отменено: {len(pending)}")Заключение
Ключевые принципы asyncio: - Asyncio оптимизирует ожидание I/O операций,
не ускоряет вычисления - Event loop управляет переключением между корутинами в
точках await - Production требует: type hints, обработку ошибок, мониторинг,
graceful shutdown - Тестируйте, профилируйте, мониторьте — asyncio требует
другого подхода к отладке
Ключевые концепции
- Asyncio эффективен для I/O — не для вычислений
- await передаёт управление event loop
- create_task() для параллельного выполнения
- gather() для сбора результатов
- Синхронизация через Lock/Semaphore/Event
- Очереди для producer-consumer
Следующие шаги
- Изучите
aiohttpдля HTTP-запросов - Попробуйте
asyncpgдля PostgreSQL - Изучите
FastAPI— async веб-фреймворк - Читайте официальную документацию Python asyncio
Полезные ресурсы
Помните: Асинхронное программирование — это инструмент. Используйте его там, где он даёт преимущество, а не везде подряд!
Debugging и профилирование asyncio приложений
Критически важно для production: Asyncio требует специальных инструментов для отладки. Стандартный debugger и print() недостаточны для диагностики race conditions, deadlocks и performance issues.
Включение debug режима
import asyncio
import warnings
# Метод 1: Через переменную окружения
# export PYTHONASYNCIODEBUG=1
# Метод 2: Программно
asyncio.run(main(), debug=True)
# Метод 3: Через loop
loop = asyncio.new_event_loop()
loop.set_debug(True)
# Debug режим включает:
# - Предупреждения о медленных корутинах (> 100ms)
# - Детектирование забытых await
# - Трейсы создания задачОтслеживание всех запущенных задач
import asyncio
from typing import Set
async def monitor_tasks() -> None:
"""Мониторинг всех активных задач в event loop."""
while True:
tasks: Set[asyncio.Task] = asyncio.all_tasks()
print(f"\n=== Активных задач: {len(tasks)} ===")
for task in tasks:
# Получаем информацию о задаче
coro = task.get_coro()
print(f"Задача: {coro.__name__}")
print(f" Выполнена: {task.done()}")
print(f" Отменена: {task.cancelled()}")
# Стек вызовов (только в debug mode)
if not task.done():
stack = task.get_stack()
if stack:
print(f" Стек: {len(stack)} фреймов")
await asyncio.sleep(5)
async def main() -> None:
# Запускаем мониторинг в фоне
monitor = asyncio.create_task(monitor_tasks())
# Ваш код здесь
await asyncio.sleep(30)
monitor.cancel()
if __name__ == "__main__":
asyncio.run(main(), debug=True)Детектирование медленных корутин
import asyncio
import time
import functools
from typing import Callable, Any
def slow_callback_duration(seconds: float = 0.1) -> None:
"""Устанавливает порог для предупреждений о медленных callback."""
loop = asyncio.get_running_loop()
loop.slow_callback_duration = seconds
def log_execution_time(func: Callable) -> Callable:
"""Декоратор для логирования времени выполнения корутин."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
duration = time.perf_counter() - start
if duration > 0.1: # Порог 100ms
print(f"⚠️ Медленная корутина {func.__name__}: {duration:.2f}s")
return wrapper
@log_execution_time
async def slow_operation() -> str:
"""Пример медленной операции."""
await asyncio.sleep(0.5)
return "done"
async def main() -> None:
slow_callback_duration(0.1)
await slow_operation()
if __name__ == "__main__":
asyncio.run(main())Профилирование с cProfile
import asyncio
import cProfile
import pstats
from io import StringIO
async def my_async_app() -> None:
"""Ваше приложение."""
tasks = [asyncio.sleep(0.1) for _ in range(100)]
await asyncio.gather(*tasks)
def profile_asyncio() -> None:
"""Профилирование asyncio приложения."""
profiler = cProfile.Profile()
profiler.enable()
# Запускаем приложение
asyncio.run(my_async_app())
profiler.disable()
# Выводим статистику
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20) # Топ-20 функций
print(stream.getvalue())
if __name__ == "__main__":
profile_asyncio()Оптимизации Python 3.12+
Python 3.12 добавил eager task factory — оптимизацию, которая запускает задачи синхронно, если они завершаются немедленно (без await). Это значительно ускоряет короткие задачи.
import asyncio
import sys
from typing import List
# Проверяем версию Python
if sys.version_info >= (3, 12):
print("✅ Python 3.12+ detected, using eager task factory")
async def quick_task(n: int) -> int:
"""Быстрая задача, которая завершается немедленно."""
return n * 2
async def slow_task(n: int) -> int:
"""Медленная задача с await."""
await asyncio.sleep(0.1)
return n * 2
async def benchmark_eager_tasks():
"""
Демонстрация преимущества eager task factory.
Eager tasks выполняются синхронно если не требуют await,
что экономит overhead на создание Task объектов.
"""
import time
# Включаем eager task factory (Python 3.12+)
if sys.version_info >= (3, 12):
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)
print("Eager task factory enabled")
# Тест с быстрыми задачами
start = time.perf_counter()
tasks = [asyncio.create_task(quick_task(i)) for i in range(10000)]
results = await asyncio.gather(*tasks)
eager_time = time.perf_counter() - start
print(f"10000 quick tasks: {eager_time:.3f}s")
print(f"First results: {results[:5]}")
if __name__ == "__main__":
asyncio.run(benchmark_eager_tasks())Когда использовать eager task factory:
- ✅ Много коротких задач без I/O
- ✅ CPU-bound микрооперации в async контексте
- ✅ Wrapping синхронных функций в async
- ❌ НЕ используйте для длинных I/O операций
Pattern matching для async операций (Python 3.10+):
import asyncio
from enum import Enum
from typing import Dict, Any
class ResponseStatus(Enum):
SUCCESS = 200
NOT_FOUND = 404
SERVER_ERROR = 500
TIMEOUT = 408
async def fetch_with_retry(url: str) -> Dict[str, Any]:
"""Fetch с pattern matching для обработки статусов."""
for attempt in range(3):
response = await fetch_url(url)
# Pattern matching для статусов
match response.status:
case ResponseStatus.SUCCESS:
return await response.json()
case ResponseStatus.NOT_FOUND:
raise ValueError(f"Resource not found: {url}")
case ResponseStatus.TIMEOUT:
if attempt < 2:
wait = 2 ** attempt
print(f"Timeout, retrying in {wait}s...")
await asyncio.sleep(wait)
continue
raise TimeoutError("Max retries exceeded")
case ResponseStatus.SERVER_ERROR:
print(f"Server error on attempt {attempt + 1}")
if attempt < 2:
await asyncio.sleep(1)
continue
raise RuntimeError("Server error after retries")
case _:
raise ValueError(f"Unexpected status: {response.status}")
async def fetch_url(url: str):
"""Mock fetch function."""
await asyncio.sleep(0.1)
# Simulate response
class Response:
status = ResponseStatus.SUCCESS
async def json(self):
return {"data": "success"}
return Response()Structural pattern matching для task results:
async def process_results(tasks: List[asyncio.Task]) -> None:
"""Обработка результатов задач с pattern matching."""
for task in tasks:
try:
result = await task
# Pattern matching по типу результата
match result:
case {"status": "ok", "data": data}:
print(f"✅ Success: {data}")
case {"status": "error", "code": code, "message": msg}:
print(f"❌ Error {code}: {msg}")
case {"status": "pending"}:
print("⏳ Still pending, will retry")
case None:
print("⚠️ No result")
case _:
print(f"❓ Unknown result format: {result}")
except asyncio.CancelledError:
print("🚫 Task was cancelled")
except Exception as e:
print(f"💥 Task failed: {e}")Детектирование deadlocks
import asyncio
from typing import Set
async def deadlock_detector(timeout: float = 5.0) -> None:
"""
Детектирует deadlock — ситуацию, когда все задачи ожидают друг друга.
Args:
timeout: Время в секундах без прогресса, после которого срабатывает alert
"""
last_task_count = 0
stall_time = 0.0
while True:
await asyncio.sleep(1)
tasks: Set[asyncio.Task] = asyncio.all_tasks()
current_count = len(tasks)
# Проверяем, изменилось ли количество задач
if current_count == last_task_count and current_count > 1:
stall_time += 1.0
if stall_time >= timeout:
print(f"⚠️ DEADLOCK DETECTED! {current_count} задач без прогресса {stall_time}s")
# Выводим информацию о всех задачах
for task in tasks:
if not task.done():
print(f" Ожидает: {task.get_coro().__name__}")
# В production здесь можно отправить alert
stall_time = 0 # Сброс для избежания спама
else:
stall_time = 0
last_task_count = current_count
# Пример использования
async def main() -> None:
# Запускаем детектор в фоне
detector = asyncio.create_task(deadlock_detector(timeout=3.0))
# Ваш код
await asyncio.sleep(10)
detector.cancel()
if __name__ == "__main__":
asyncio.run(main())Визуализация выполнения (aiodebug)
# pip install aiodebug
import asyncio
from aiodebug import log_slow_callbacks
async def main() -> None:
# Логирует все callback'и медленнее 50ms
with log_slow_callbacks.enable(0.05):
# Ваш код
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())Best practices для debugging
1. Используйте structured logging
import logging
import asyncio
from typing import Any
# Настройка логирования
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
async def fetch_data(url: str) -> dict:
"""Пример с логированием."""
logger.debug(f"Начало запроса: {url}")
try:
await asyncio.sleep(1) # Имитация запроса
logger.info(f"Успешный запрос: {url}")
return {"status": "ok"}
except Exception as e:
logger.error(f"Ошибка запроса {url}: {e}", exc_info=True)
raise2. Давайте задачам имена
import asyncio
async def worker(worker_id: int) -> None:
await asyncio.sleep(1)
async def main() -> None:
# ✅ С именами легче дебажить
tasks = [
asyncio.create_task(worker(i), name=f"worker-{i}")
for i in range(5)
]
# В логах будет видно "worker-3" вместо "<Task pending>"
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())3. Используйте context variables для трейсинга
import asyncio
from contextvars import ContextVar
from typing import Optional
import uuid
# Context variable для request ID
request_id: ContextVar[Optional[str]] = ContextVar('request_id', default=None)
async def process_request() -> None:
"""Обработка запроса с трейсингом."""
# Устанавливаем unique ID для запроса
req_id = str(uuid.uuid4())
request_id.set(req_id)
print(f"[{request_id.get()}] Начало обработки")
await asyncio.sleep(0.1)
print(f"[{request_id.get()}] Завершение обработки")
async def main() -> None:
# Каждая задача сохраняет свой context
await asyncio.gather(
process_request(),
process_request(),
process_request()
)
if __name__ == "__main__":
asyncio.run(main())Тестирование asyncio кода
Тестирование критично: Asyncio код сложнее тестировать из-за асинхронности
и race conditions. Используйте pytest-asyncio, моки для внешних зависимостей
и фикстуры для изоляции тестов.
Настройка pytest-asyncio
pip install pytest pytest-asyncio pytest-cov# conftest.py
import pytest
import asyncio
# Включаем asyncio mode для всех тестов
def pytest_configure(config):
config.addinivalue_line("markers", "asyncio: mark test as async")
@pytest.fixture(scope="session")
def event_loop():
"""Создаём event loop для всей сессии тестов."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()Базовые тесты asyncio функций
# test_async_functions.py
import pytest
import asyncio
from typing import List
# Функции для тестирования
async def fetch_data(url: str) -> dict:
"""Имитация HTTP запроса."""
await asyncio.sleep(0.1)
return {"url": url, "status": 200}
async def process_urls(urls: List[str]) -> List[dict]:
"""Параллельная обработка URLs."""
tasks = [fetch_data(url) for url in urls]
return await asyncio.gather(*tasks)
# Тесты
@pytest.mark.asyncio
async def test_fetch_data():
"""Тест одиночного запроса."""
result = await fetch_data("https://example.com")
assert result["url"] == "https://example.com"
assert result["status"] == 200
@pytest.mark.asyncio
async def test_process_urls():
"""Тест параллельной обработки."""
urls = ["https://example.com", "https://test.com"]
results = await process_urls(urls)
assert len(results) == 2
assert all(r["status"] == 200 for r in results)
@pytest.mark.asyncio
async def test_process_urls_empty():
"""Тест граничного случая."""
results = await process_urls([])
assert results == []Моки для асинхронных функций
import pytest
from unittest.mock import AsyncMock, patch
import aiohttp
async def fetch_user(user_id: int) -> dict:
"""Реальная функция с HTTP запросом."""
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
return await resp.json()
@pytest.mark.asyncio
async def test_fetch_user_with_mock():
"""Тест с моком aiohttp."""
mock_response = AsyncMock()
mock_response.json.return_value = {"id": 1, "name": "John"}
with patch("aiohttp.ClientSession.get") as mock_get:
mock_get.return_value.__aenter__.return_value = mock_response
result = await fetch_user(1)
assert result["id"] == 1
assert result["name"] == "John"
mock_get.assert_called_once()Тестирование таймаутов и ошибок
import pytest
import asyncio
async def slow_operation(delay: float) -> str:
"""Медленная операция."""
await asyncio.sleep(delay)
return "done"
@pytest.mark.asyncio
async def test_timeout_error():
"""Тест таймаута."""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(2.0), timeout=0.5)
@pytest.mark.asyncio
async def test_cancellation():
"""Тест отмены задачи."""
task = asyncio.create_task(slow_operation(10.0))
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await taskBest practices для тестирования
1. Изолируйте тесты
@pytest.fixture(autouse=True)
async def reset_state():
"""Автоматически сбрасываем состояние между тестами."""
# Setup
await clear_cache()
yield
# Teardown
await cleanup_resources()2. Используйте моки для внешних сервисов
# ❌ НЕ делайте реальные HTTP запросы в тестах
@pytest.mark.asyncio
async def test_bad():
result = await fetch("https://real-api.com") # Медленно и ненадёжно
# ✅ Мокайте внешние зависимости
@pytest.mark.asyncio
async def test_good(mocker):
mocker.patch("your_module.fetch", return_value={"status": "ok"})
result = await your_function()
assert result["status"] == "ok"Memory Leaks и оптимизация памяти
Критическая проблема в production: Утечки памяти в asyncio приложениях сложно детектировать, но легко создать. Незавершенные задачи, циклические ссылки и кэши без ограничений — основные источники проблем.
Типичные источники утечек памяти
1. Незавершенные задачи
import asyncio
from typing import Set
# ❌ УТЕЧКА: задача создана, но никогда не awaited
async def memory_leak():
while True:
# Создаём задачу и забываем про неё
asyncio.create_task(work())
await asyncio.sleep(1)
# ✅ ПРАВИЛЬНО: храним ссылки и очищаем
async def no_leak():
tasks: Set[asyncio.Task] = set()
async def cleanup_done():
"""Периодически очищаем завершённые задачи."""
while True:
tasks.difference_update({t for t in tasks if t.done()})
await asyncio.sleep(10)
asyncio.create_task(cleanup_done())
while True:
task = asyncio.create_task(work())
tasks.add(task)
await asyncio.sleep(1)
async def work() -> None:
await asyncio.sleep(0.1)2. Циклические ссылки в корутинах
import asyncio
import weakref
from typing import Optional
class Resource:
def __init__(self, name: str):
self.name = name
self.task: Optional[asyncio.Task] = None
async def start(self) -> None:
# ❌ УТЕЧКА: task ссылается на self, self ссылается на task
self.task = asyncio.create_task(self.run())
async def run(self) -> None:
while True:
await asyncio.sleep(1)
# ✅ ПРАВИЛЬНО: используем weak reference
async def start_safe(self) -> None:
"""Безопасный запуск без циклических ссылок."""
weak_self = weakref.ref(self)
async def run_weak():
while True:
obj = weak_self()
if obj is None:
break # Объект удалён
# Работа
await asyncio.sleep(1)
self.task = asyncio.create_task(run_weak())3. Кэши без ограничений
import asyncio
from functools import lru_cache
from typing import Dict, Any
import time
# ❌ УТЕЧКА: кэш растёт бесконечно
_cache: Dict[str, Any] = {}
async def fetch_with_leak(key: str) -> Any:
if key not in _cache:
_cache[key] = await expensive_operation(key)
return _cache[key]
# ✅ ПРАВИЛЬНО: LRU cache с ограничением
@lru_cache(maxsize=1000)
def sync_cached_function(key: str) -> Any:
return expensive_sync_operation(key)
# Для async функций используем custom реализацию с TTL
class AsyncLRUCache:
def __init__(self, maxsize: int = 1000, ttl: float = 3600):
self.cache: Dict[str, tuple[Any, float]] = {}
self.maxsize = maxsize
self.ttl = ttl
async def get(self, key: str, fetch_fn) -> Any:
# Проверяем наличие и TTL
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return value
# Fetch и сохраняем
value = await fetch_fn(key)
# Очищаем старые записи если превышен лимит
if len(self.cache) >= self.maxsize:
oldest = min(self.cache.items(), key=lambda x: x[1][1])
del self.cache[oldest[0]]
self.cache[key] = (value, time.time())
return value
async def expensive_operation(key: str) -> Any:
await asyncio.sleep(0.1)
return f"result_{key}"
def expensive_sync_operation(key: str) -> Any:
return f"sync_result_{key}"Детектирование утечек памяти
1. Мониторинг количества задач
import asyncio
import psutil
import os
from typing import Dict
async def memory_monitor(interval: float = 5.0) -> None:
"""Мониторинг использования памяти и количества задач."""
process = psutil.Process(os.getpid())
while True:
# Текущее использование памяти
mem_info = process.memory_info()
rss_mb = mem_info.rss / 1024 / 1024
# Количество задач
tasks = asyncio.all_tasks()
active_tasks = len([t for t in tasks if not t.done()])
# Логируем
print(f"Memory: {rss_mb:.1f}MB | Active tasks: {active_tasks}")
# Alert если превышены пороги
if rss_mb > 500: # 500MB
print(f"⚠️ HIGH MEMORY USAGE: {rss_mb:.1f}MB")
if active_tasks > 1000:
print(f"⚠️ TOO MANY TASKS: {active_tasks}")
await asyncio.sleep(interval)2. Трекинг создания задач
import asyncio
import traceback
from collections import defaultdict
from typing import Dict, List
class TaskTracker:
"""Отслеживает создание задач для поиска утечек."""
def __init__(self):
self.task_origins: Dict[asyncio.Task, str] = {}
self.creation_counts: Dict[str, int] = defaultdict(int)
def track_task(self, task: asyncio.Task) -> None:
"""Сохраняет место создания задачи."""
stack = "".join(traceback.format_stack()[:-1])
self.task_origins[task] = stack
self.creation_counts[stack] += 1
def report(self) -> None:
"""Выводит отчёт о задачах."""
active = {t: origin for t, origin in self.task_origins.items() if not t.done()}
print(f"\n=== Task Leak Report ===")
print(f"Active tasks: {len(active)}")
print(f"\nTop task creation sites:")
# Сортируем по количеству созданий
sorted_counts = sorted(
self.creation_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5]
for stack, count in sorted_counts:
print(f"\n{count} tasks created at:")
print(stack[:200]) # Первые 200 символов стека
# Использование
tracker = TaskTracker()
def create_task_tracked(coro):
"""Обёртка для asyncio.create_task с трекингом."""
task = asyncio.create_task(coro)
tracker.track_task(task)
return task3. Использование tracemalloc
import asyncio
import tracemalloc
from typing import List, Tuple
async def memory_profiling_example() -> None:
"""Пример профилирования памяти."""
# Включаем tracemalloc
tracemalloc.start()
# Делаем снимок ДО
snapshot1 = tracemalloc.take_snapshot()
# Выполняем код, который может иметь утечки
tasks = []
for i in range(1000):
tasks.append(asyncio.create_task(work()))
await asyncio.gather(*tasks)
# Делаем снимок ПОСЛЕ
snapshot2 = tracemalloc.take_snapshot()
# Сравниваем
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("[ Top 10 memory increases ]")
for stat in top_stats[:10]:
print(stat)
tracemalloc.stop()
async def work() -> None:
await asyncio.sleep(0.01)Best practices для предотвращения утечек
1. Всегда завершайте задачи
import asyncio
from typing import Set
class Application:
def __init__(self):
self.tasks: Set[asyncio.Task] = set()
def create_task(self, coro):
"""Создаёт задачу и сохраняет ссылку."""
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
return task
async def shutdown(self):
"""Корректное завершение всех задач."""
# Отменяем все задачи
for task in self.tasks:
task.cancel()
# Ждём завершения
await asyncio.gather(*self.tasks, return_exceptions=True)2. Используйте context managers
import asyncio
from typing import Set
class TaskGroup:
"""Context manager для группы задач."""
def __init__(self):
self.tasks: Set[asyncio.Task] = set()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Отменяем все задачи при выходе
for task in self.tasks:
if not task.done():
task.cancel()
# Ждём завершения
await asyncio.gather(*self.tasks, return_exceptions=True)
def create_task(self, coro):
"""Создаёт задачу в группе."""
task = asyncio.create_task(coro)
self.tasks.add(task)
return task
# Использование
async def main():
async with TaskGroup() as group:
for i in range(10):
group.create_task(worker(i))
# Все задачи автоматически отменятся при выходе
async def worker(worker_id: int):
await asyncio.sleep(1)3. Ограничивайте количество задач
import asyncio
async def process_items_bounded(items, max_concurrent=10):
"""Обработка с ограничением параллелизма."""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_semaphore(item) for item in items]
return await asyncio.gather(*tasks)
async def process_item(item):
await asyncio.sleep(0.1)
return f"processed_{item}"Антипаттерны и Code Review Checklist
Code Review обязателен: Asyncio код требует особого внимания при ревью. Многие ошибки проявляются только в production под нагрузкой.
Топ-10 антипаттернов
1. ❌ Забытый await
# ❌ НЕПРАВИЛЬНО
async def bad():
result = fetch_data() # Вернёт coroutine объект!
print(result) # <coroutine object fetch_data>
# ✅ ПРАВИЛЬНО
async def good():
result = await fetch_data()
print(result)2. ❌ Блокирующие операции в event loop
import time
import requests
# ❌ НЕПРАВИЛЬНО: блокирует event loop
async def bad():
time.sleep(1) # Останавливает ВСЁ
response = requests.get("https://api.com") # Блокирующий запрос
# ✅ ПРАВИЛЬНО: используем асинхронные аналоги
async def good():
await asyncio.sleep(1)
async with aiohttp.ClientSession() as session:
async with session.get("https://api.com") as resp:
data = await resp.json()3. ❌ Создание множества event loops
# ❌ НЕПРАВИЛЬНО
async def bad():
asyncio.run(some_coro()) # RuntimeError!
# ✅ ПРАВИЛЬНО
async def good():
await some_coro()
# asyncio.run() только в main
if __name__ == "__main__":
asyncio.run(good())4. ❌ Отсутствие обработки ошибок в задачах
# ❌ НЕПРАВИЛЬНО: исключение проглатывается
async def bad():
task = asyncio.create_task(risky_operation())
# Если не сделать await, исключение потеряется
# ✅ ПРАВИЛЬНО: всегда обрабатывайте исключения
async def good():
task = asyncio.create_task(risky_operation())
try:
result = await task
except Exception as e:
logger.error(f"Task failed: {e}")5. ❌ Race condition без синхронизации
# ❌ НЕПРАВИЛЬНО
class Counter:
def __init__(self):
self.value = 0
async def increment(self):
temp = self.value
await asyncio.sleep(0.001) # ← Race condition!
self.value = temp + 1
# ✅ ПРАВИЛЬНО
class CounterSafe:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
temp = self.value
await asyncio.sleep(0.001)
self.value = temp + 16. ❌ Отсутствие таймаутов
# ❌ НЕПРАВИЛЬНО: может зависнуть навсегда
async def bad():
response = await fetch_external_api()
# ✅ ПРАВИЛЬНО: всегда устанавливайте таймауты
async def good():
try:
response = await asyncio.wait_for(
fetch_external_api(),
timeout=5.0
)
except asyncio.TimeoutError:
logger.error("API timeout")
raise7. ❌ Незавершенные задачи при shutdown
# ❌ НЕПРАВИЛЬНО
async def bad():
tasks = [asyncio.create_task(worker(i)) for i in range(10)]
# Приложение завершается, задачи остаются незавершёнными
# ✅ ПРАВИЛЬНО: graceful shutdown
async def good():
tasks = [asyncio.create_task(worker(i)) for i in range(10)]
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
# Отменяем задачи
for task in tasks:
task.cancel()
# Ждём завершения
await asyncio.gather(*tasks, return_exceptions=True)8. ❌ Игнорирование return_exceptions
# ❌ НЕПРАВИЛЬНО: одна ошибка ломает всё
async def bad():
results = await asyncio.gather(
fetch(url1),
fetch(url2),
fetch(url3) # Если здесь ошибка, всё падает
)
# ✅ ПРАВИЛЬНО: обрабатываем ошибки индивидуально
async def good():
results = await asyncio.gather(
fetch(url1),
fetch(url2),
fetch(url3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} failed: {result}")9. ❌ Shared mutable state без защиты
# ❌ НЕПРАВИЛЬНО
cache = {}
async def bad(key):
if key not in cache:
cache[key] = await fetch(key) # ← Race condition!
return cache[key]
# ✅ ПРАВИЛЬНО
class SafeCache:
def __init__(self):
self.cache = {}
self.locks = {}
self.main_lock = asyncio.Lock()
async def get(self, key):
# Получаем lock для конкретного ключа
async with self.main_lock:
if key not in self.locks:
self.locks[key] = asyncio.Lock()
lock = self.locks[key]
# Используем lock для ключа
async with lock:
if key not in self.cache:
self.cache[key] = await fetch(key)
return self.cache[key]10. ❌ Отсутствие type hints
# ❌ НЕПРАВИЛЬНО
async def process(data):
result = await transform(data)
return result
# ✅ ПРАВИЛЬНО
from typing import Dict, List, Any
async def process(data: List[Dict[str, Any]]) -> List[str]:
result: List[str] = await transform(data)
return resultCode Review Checklist
Базовый уровень:
- Все async функции используются с
await - Нет блокирующих операций (
time.sleep, синхронные I/O) asyncio.run()только в точке входа- Все задачи имеют обработку ошибок
- Используются type hints
- Есть docstrings для public API
Продвинутый уровень:
- Установлены таймауты для всех внешних вызовов
- Используется
return_exceptions=Trueвgather()где нужно - Shared state защищён примитивами синхронизации
- Задачи корректно завершаются при shutdown
- Нет утечек памяти (проверено с tracemalloc)
- Есть graceful shutdown
- Логирование с context (request ID, trace ID)
Production-ready:
- Мониторинг метрик (количество задач, память, latency)
- Health checks для зависимостей
- Circuit breaker для внешних сервисов
- Rate limiting где необходимо
- Написаны тесты (unit + integration)
- Документация по развёртыванию
- Runbook для операторов
- Алерты настроены
Автоматические проверки
pre-commit hook:
# .pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.0
hooks:
- id: ruff
args: [--fix]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.0
hooks:
- id: mypy
additional_dependencies: [types-all]mypy configuration:
# mypy.ini
[mypy]
python_version = 3.11
strict = True
warn_return_any = True
warn_unused_configs = True
# Для asyncio
plugins = pydantic.mypyСквозной пример: Production-Ready API сервис
Комплексный пример: Этот сервис демонстрирует все best practices — graceful shutdown, мониторинг, обработку ошибок, тестирование. Используйте как шаблон для своих проектов.
Архитектура сервиса
api_service/
├── main.py # Точка входа
├── config.py # Конфигурация
├── api/
│ ├── __init__.py
│ ├── handlers.py # HTTP handlers
│ └── middleware.py # Middleware
├── services/
│ ├── __init__.py
│ ├── cache.py # Кэш с TTL
│ ├── database.py # Async БД
│ └── external_api.py # Внешние API
├── utils/
│ ├── __init__.py
│ ├── monitoring.py # Метрики
│ └── logging_config.py
└── tests/
├── __init__.py
├── test_handlers.py
└── test_services.py
1. Конфигурация (config.py)
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""Конфигурация приложения."""
# Server
host: str = "0.0.0.0"
port: int = 8000
workers: int = 4
# Database
database_url: str = "postgresql://user:pass@localhost/dbname"
db_pool_min_size: int = 10
db_pool_max_size: int = 20
# Cache
redis_url: str = "redis://localhost:6379"
cache_ttl: int = 300 # seconds
# External API
external_api_url: str = "https://api.example.com"
external_api_timeout: float = 5.0
external_api_max_retries: int = 3
# Monitoring
enable_metrics: bool = True
metrics_port: int = 9090
# Logging
log_level: str = "INFO"
class Config:
env_file = ".env"
settings = Settings()2. Database Service (services/database.py)
import asyncpg
import asyncio
from typing import List, Dict, Any, Optional
from contextlib import asynccontextmanager
import logging
logger = logging.getLogger(__name__)
class DatabaseService:
"""Сервис для работы с PostgreSQL через asyncpg."""
def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool: Optional[asyncpg.Pool] = None
async def connect(self) -> None:
"""Создание connection pool."""
logger.info("Connecting to database...")
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
logger.info("Database connected")
async def disconnect(self) -> None:
"""Закрытие connection pool."""
if self.pool:
logger.info("Closing database connections...")
await self.pool.close()
logger.info("Database disconnected")
@asynccontextmanager
async def connection(self):
"""Context manager для получения соединения из pool."""
if not self.pool:
raise RuntimeError("Database not connected")
async with self.pool.acquire() as conn:
yield conn
async def fetch_user(self, user_id: int) -> Optional[Dict[str, Any]]:
"""Получение пользователя по ID."""
async with self.connection() as conn:
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
async def fetch_users(self, limit: int = 100) -> List[Dict[str, Any]]:
"""Получение списка пользователей."""
async with self.connection() as conn:
rows = await conn.fetch(
"SELECT id, name, email FROM users LIMIT $1",
limit
)
return [dict(row) for row in rows]
async def create_user(self, name: str, email: str) -> int:
"""Создание нового пользователя."""
async with self.connection() as conn:
user_id = await conn.fetchval(
"INSERT INTO users(name, email) VALUES($1, $2) RETURNING id",
name,
email
)
return user_id
async def health_check(self) -> bool:
"""Проверка здоровья БД."""
try:
async with self.connection() as conn:
await conn.fetchval("SELECT 1")
return True
except Exception as e:
logger.error(f"Database health check failed: {e}")
return False3. Cache Service (services/cache.py)
import asyncio
from typing import Optional, Any
import json
import time
import logging
logger = logging.getLogger(__name__)
class AsyncCache:
"""In-memory кэш с TTL и LRU eviction."""
def __init__(self, max_size: int = 1000, default_ttl: float = 300):
self.cache: dict[str, tuple[Any, float]] = {}
self.max_size = max_size
self.default_ttl = default_ttl
self.lock = asyncio.Lock()
self.hits = 0
self.misses = 0
async def get(self, key: str) -> Optional[Any]:
"""Получение значения из кэша."""
async with self.lock:
if key in self.cache:
value, expires_at = self.cache[key]
# Проверяем TTL
if time.time() < expires_at:
self.hits += 1
return value
else:
# Удаляем просроченное значение
del self.cache[key]
self.misses += 1
return None
async def set(self, key: str, value: Any, ttl: Optional[float] = None) -> None:
"""Сохранение значения в кэш."""
async with self.lock:
# Очищаем старые записи если достигнут лимит
if len(self.cache) >= self.max_size:
await self._evict_oldest()
expires_at = time.time() + (ttl or self.default_ttl)
self.cache[key] = (value, expires_at)
async def delete(self, key: str) -> None:
"""Удаление ключа из кэша."""
async with self.lock:
self.cache.pop(key, None)
async def clear(self) -> None:
"""Очистка всего кэша."""
async with self.lock:
self.cache.clear()
async def _evict_oldest(self) -> None:
"""Удаление самой старой записи."""
if not self.cache:
return
oldest_key = min(self.cache.items(), key=lambda x: x[1][1])[0]
del self.cache[oldest_key]
async def get_stats(self) -> dict:
"""Получение статистики кэша."""
async with self.lock:
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return {
"size": len(self.cache),
"max_size": self.max_size,
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{hit_rate:.1f}%"
}4. External API Client (services/external_api.py)
import asyncio
import aiohttp
from typing import Optional, Dict, Any
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def retry_on_failure(max_retries: int = 3, backoff: float = 1.0):
"""Декоратор для retry с exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = backoff * (2 ** attempt)
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{max_retries}), "
f"retrying in {wait_time}s: {e}"
)
await asyncio.sleep(wait_time)
logger.error(f"{func.__name__} failed after {max_retries} attempts")
raise last_exception
return wrapper
return decorator
class ExternalAPIClient:
"""Клиент для внешнего API с retry и circuit breaker."""
def __init__(
self,
base_url: str,
timeout: float = 5.0,
max_retries: int = 3
):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.session: Optional[aiohttp.ClientSession] = None
# Circuit breaker state
self.failure_count = 0
self.failure_threshold = 5
self.is_open = False
self.last_failure_time: Optional[float] = None
self.reset_timeout = 60.0 # seconds
async def connect(self) -> None:
"""Создание HTTP сессии."""
self.session = aiohttp.ClientSession(timeout=self.timeout)
logger.info("External API client connected")
async def disconnect(self) -> None:
"""Закрытие HTTP сессии."""
if self.session:
await self.session.close()
logger.info("External API client disconnected")
async def _check_circuit_breaker(self) -> None:
"""Проверка состояния circuit breaker."""
if self.is_open:
# Проверяем, можно ли попробовать снова
if self.last_failure_time and \
time.time() - self.last_failure_time > self.reset_timeout:
logger.info("Circuit breaker: trying half-open state")
self.is_open = False
self.failure_count = 0
else:
raise Exception("Circuit breaker is OPEN")
async def _record_success(self) -> None:
"""Регистрация успешного запроса."""
self.failure_count = 0
if self.is_open:
logger.info("Circuit breaker: closed")
self.is_open = False
async def _record_failure(self) -> None:
"""Регистрация неудачного запроса."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
logger.error("Circuit breaker: OPEN")
self.is_open = True
@retry_on_failure(max_retries=3)
async def fetch_data(self, endpoint: str) -> Dict[str, Any]:
"""Получение данных из API."""
await self._check_circuit_breaker()
if not self.session:
raise RuntimeError("Client not connected")
url = f"{self.base_url}/{endpoint}"
try:
async with self.session.get(url) as response:
response.raise_for_status()
data = await response.json()
await self._record_success()
return data
except Exception as e:
await self._record_failure()
logger.error(f"API request failed: {e}")
raise
async def health_check(self) -> bool:
"""Проверка здоровья API."""
try:
await self.fetch_data("health")
return True
except Exception:
return False5. HTTP Handlers (api/handlers.py)
from aiohttp import web
from typing import Dict, Any
import logging
import time
logger = logging.getLogger(__name__)
class APIHandlers:
"""HTTP handlers для API endpoints."""
def __init__(self, db, cache, external_api):
self.db = db
self.cache = cache
self.external_api = external_api
async def get_user(self, request: web.Request) -> web.Response:
"""
GET /users/{user_id}
Получение пользователя по ID с кэшированием.
"""
start_time = time.time()
user_id = int(request.match_info['user_id'])
# Проверяем кэш
cache_key = f"user:{user_id}"
cached = await self.cache.get(cache_key)
if cached:
logger.info(f"Cache HIT for user {user_id}")
return web.json_response(cached)
# Запрашиваем из БД
logger.info(f"Cache MISS for user {user_id}")
user = await self.db.fetch_user(user_id)
if not user:
return web.json_response(
{"error": "User not found"},
status=404
)
# Сохраняем в кэш
await self.cache.set(cache_key, user, ttl=300)
duration = time.time() - start_time
logger.info(f"get_user completed in {duration:.3f}s")
return web.json_response(user)
async def list_users(self, request: web.Request) -> web.Response:
"""
GET /users
Получение списка пользователей.
"""
try:
limit = int(request.query.get('limit', '100'))
users = await self.db.fetch_users(limit=limit)
return web.json_response({
"users": users,
"count": len(users)
})
except Exception as e:
logger.error(f"list_users failed: {e}", exc_info=True)
return web.json_response(
{"error": "Internal server error"},
status=500
)
async def create_user(self, request: web.Request) -> web.Response:
"""
POST /users
Создание нового пользователя.
"""
try:
data = await request.json()
# Валидация
if 'name' not in data or 'email' not in data:
return web.json_response(
{"error": "name and email are required"},
status=400
)
# Создаём пользователя
user_id = await self.db.create_user(
name=data['name'],
email=data['email']
)
return web.json_response(
{"id": user_id, "status": "created"},
status=201
)
except Exception as e:
logger.error(f"create_user failed: {e}", exc_info=True)
return web.json_response(
{"error": "Internal server error"},
status=500
)
async def health(self, request: web.Request) -> web.Response:
"""
GET /health
Health check endpoint.
"""
db_healthy = await self.db.health_check()
api_healthy = await self.external_api.health_check()
status = "healthy" if (db_healthy and api_healthy) else "unhealthy"
status_code = 200 if status == "healthy" else 503
return web.json_response({
"status": status,
"checks": {
"database": "ok" if db_healthy else "failed",
"external_api": "ok" if api_healthy else "failed"
}
}, status=status_code)
async def metrics(self, request: web.Request) -> web.Response:
"""
GET /metrics
Метрики приложения.
"""
cache_stats = await self.cache.get_stats()
return web.json_response({
"cache": cache_stats,
"database": {
"pool_size": self.db.pool.get_size() if self.db.pool else 0
}
})6. Main Application (main.py)
import asyncio
import signal
from aiohttp import web
import logging
from config import settings
from services.database import DatabaseService
from services.cache import AsyncCache
from services.external_api import ExternalAPIClient
from api.handlers import APIHandlers
# Настройка логирования
logging.basicConfig(
level=settings.log_level,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
class Application:
"""Главное приложение."""
def __init__(self):
self.app: Optional[web.Application] = None
self.runner: Optional[web.AppRunner] = None
self.site: Optional[web.TCPSite] = None
# Services
self.db = DatabaseService(
settings.database_url,
min_size=settings.db_pool_min_size,
max_size=settings.db_pool_max_size
)
self.cache = AsyncCache()
self.external_api = ExternalAPIClient(
settings.external_api_url,
timeout=settings.external_api_timeout,
max_retries=settings.external_api_max_retries
)
# Shutdown event
self.shutdown_event = asyncio.Event()
async def startup(self) -> None:
"""Инициализация приложения."""
logger.info("Starting application...")
# Подключаем сервисы
await self.db.connect()
await self.external_api.connect()
# Создаём web app
self.app = web.Application()
# Регистрируем handlers
handlers = APIHandlers(self.db, self.cache, self.external_api)
self.app.router.add_get('/users/{user_id}', handlers.get_user)
self.app.router.add_get('/users', handlers.list_users)
self.app.router.add_post('/users', handlers.create_user)
self.app.router.add_get('/health', handlers.health)
self.app.router.add_get('/metrics', handlers.metrics)
# Запускаем сервер
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.site = web.TCPSite(
self.runner,
settings.host,
settings.port
)
await self.site.start()
logger.info(f"Server started on {settings.host}:{settings.port}")
async def shutdown(self) -> None:
"""Graceful shutdown."""
logger.info("Shutting down...")
# Останавливаем сервер
if self.site:
await self.site.stop()
if self.runner:
await self.runner.cleanup()
# Закрываем сервисы
await self.external_api.disconnect()
await self.db.disconnect()
logger.info("Shutdown complete")
async def run(self) -> None:
"""Запуск приложения."""
# Startup
await self.startup()
# Ждём сигнала завершения
await self.shutdown_event.wait()
# Shutdown
await self.shutdown()
def handle_signal(self, sig) -> None:
"""Обработка сигналов завершения."""
logger.info(f"Received signal {sig}")
self.shutdown_event.set()
async def main() -> None:
"""Точка входа."""
app = Application()
# Регистрируем обработчики сигналов
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: app.handle_signal(s))
# Запускаем приложение
await app.run()
if __name__ == "__main__":
asyncio.run(main())7. Тесты (tests/test_handlers.py)
import pytest
from aiohttp import web
from unittest.mock import AsyncMock
from api.handlers import APIHandlers
@pytest.fixture
def mock_db():
"""Mock базы данных."""
db = AsyncMock()
db.fetch_user.return_value = {"id": 1, "name": "John", "email": "john@example.com"}
db.fetch_users.return_value = [{"id": 1, "name": "John"}]
db.create_user.return_value = 1
db.health_check.return_value = True
return db
@pytest.fixture
def mock_cache():
"""Mock кэша."""
cache = AsyncMock()
cache.get.return_value = None
cache.set.return_value = None
cache.get_stats.return_value = {"size": 0, "hits": 0}
return cache
@pytest.fixture
def mock_external_api():
"""Mock внешнего API."""
api = AsyncMock()
api.health_check.return_value = True
return api
@pytest.fixture
def handlers(mock_db, mock_cache, mock_external_api):
"""Создание handlers с моками."""
return APIHandlers(mock_db, mock_cache, mock_external_api)
@pytest.mark.asyncio
async def test_get_user_from_db(handlers, mock_db, mock_cache):
"""Тест получения пользователя из БД."""
# Создаём mock request
request = AsyncMock()
request.match_info = {'user_id': '1'}
# Вызываем handler
response = await handlers.get_user(request)
# Проверки
assert response.status == 200
mock_db.fetch_user.assert_called_once_with(1)
mock_cache.get.assert_called_once()
mock_cache.set.assert_called_once()
@pytest.mark.asyncio
async def test_get_user_from_cache(handlers, mock_db, mock_cache):
"""Тест получения пользователя из кэша."""
# Настраиваем mock cache
mock_cache.get.return_value = {"id": 1, "name": "John"}
request = AsyncMock()
request.match_info = {'user_id': '1'}
response = await handlers.get_user(request)
# Проверяем, что БД не вызывалась
assert response.status == 200
mock_db.fetch_user.assert_not_called()
@pytest.mark.asyncio
async def test_create_user(handlers, mock_db):
"""Тест создания пользователя."""
request = AsyncMock()
request.json.return_value = {"name": "John", "email": "john@example.com"}
response = await handlers.create_user(request)
assert response.status == 201
mock_db.create_user.assert_called_once_with(
name="John",
email="john@example.com"
)
@pytest.mark.asyncio
async def test_health_check(handlers):
"""Тест health check endpoint."""
request = AsyncMock()
response = await handlers.health(request)
assert response.status == 200Запуск сервиса
# Установка зависимостей
pip install aiohttp asyncpg pydantic-settings pytest pytest-asyncio
# Запуск
python main.py
# Тестирование
pytest tests/ -v
# Проверка health
curl http://localhost:8000/health
# Получение пользователя
curl http://localhost:8000/users/1
# Метрики
curl http://localhost:8000/metricsКлючевые особенности примера
Production-ready features:
- ✅ Graceful shutdown — корректное завершение при SIGTERM/SIGINT
- ✅ Connection pooling — для БД и HTTP клиентов
- ✅ Кэширование — с TTL и LRU eviction
- ✅ Retry logic — с exponential backoff
- ✅ Circuit breaker — защита от каскадных сбоев
- ✅ Health checks — для мониторинга
- ✅ Structured logging — с контекстом
- ✅ Type hints — везде
- ✅ Тестирование — unit tests с моками
- ✅ Конфигурация — через env variables
Этот пример можно использовать как основу для реальных проектов!
Security Best Practices для Asyncio
Критично для production: Asyncio приложения подвержены специфическим уязвимостям — DoS через неограниченные задачи, injection через неправильную обработку ввода, утечки данных через race conditions.
1. Защита от DoS атак
Проблема: Неограниченное создание задач может исчерпать память
import asyncio
from typing import Set
import logging
logger = logging.getLogger(__name__)
class TaskLimiter:
"""Защита от DoS через ограничение количества задач."""
def __init__(self, max_tasks: int = 1000):
self.max_tasks = max_tasks
self.active_tasks: Set[asyncio.Task] = set()
self.semaphore = asyncio.Semaphore(max_tasks)
async def create_task(self, coro, name: str = None) -> asyncio.Task:
"""
Создает задачу с контролем лимита.
Raises:
RuntimeError: Если достигнут лимит задач
"""
if len(self.active_tasks) >= self.max_tasks:
raise RuntimeError(f"Task limit exceeded: {self.max_tasks}")
task = asyncio.create_task(coro, name=name)
self.active_tasks.add(task)
# Автоматически удаляем из set при завершении
task.add_done_callback(self.active_tasks.discard)
return task
async def wait_available(self) -> None:
"""Ждет пока появится слот для новой задачи."""
async with self.semaphore:
pass
# Использование
limiter = TaskLimiter(max_tasks=100)
async def handle_request(request):
"""Handler с защитой от DoS."""
try:
task = await limiter.create_task(
process_request(request),
name=f"request-{request.id}"
)
return await task
except RuntimeError:
# Слишком много задач — возвращаем 503
return {"error": "Service temporarily unavailable"}, 5032. Безопасная работа с HTTP
import asyncio
import aiohttp
from ssl import create_default_context, CERT_REQUIRED
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
class SecureHTTPClient:
"""HTTP клиент с security best practices."""
def __init__(
self,
max_connections: int = 100,
max_per_host: int = 10,
timeout: float = 30.0
):
# SSL контекст с проверкой сертификатов
ssl_context = create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = CERT_REQUIRED
# Таймауты для всех операций
self.timeout = aiohttp.ClientTimeout(
total=timeout,
connect=5,
sock_read=10
)
# Ограничение подключений (защита от DoS)
self.connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=max_connections,
limit_per_host=max_per_host,
ttl_dns_cache=300
)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch(
self,
url: str,
max_size: int = 10 * 1024 * 1024, # 10MB
headers: Optional[Dict[str, str]] = None
) -> bytes:
"""
Безопасный HTTP fetch с защитой от атак.
Args:
url: URL для запроса
max_size: Максимальный размер ответа (защита от zip bombs)
headers: Дополнительные заголовки
Returns:
Тело ответа
Raises:
SecurityError: Если ответ слишком большой
ValueError: Если URL небезопасный
"""
# Валидация URL
if not url.startswith(('https://', 'http://')):
raise ValueError(f"Unsafe URL scheme: {url}")
# Безопасные заголовки по умолчанию
safe_headers = {
"User-Agent": "SecureClient/1.0",
"Accept": "application/json",
}
if headers:
safe_headers.update(headers)
if not self.session:
raise RuntimeError("Client not initialized")
try:
async with self.session.get(
url,
headers=safe_headers,
allow_redirects=True,
max_redirects=5
) as response:
# Проверка Content-Length перед загрузкой
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
raise SecurityError(
f"Response too large: {content_length} > {max_size}"
)
# Читаем с ограничением размера
content = bytearray()
async for chunk in response.content.iter_chunked(8192):
content.extend(chunk)
if len(content) > max_size:
raise SecurityError(
f"Response exceeded max_size: {len(content)}"
)
return bytes(content)
except asyncio.TimeoutError:
logger.error(f"Request timeout: {url}")
raise
except aiohttp.ClientError as e:
logger.error(f"HTTP error for {url}: {e}")
raise
class SecurityError(Exception):
"""Security-related error."""
pass3. Защита от SQL Injection
import asyncpg
from typing import List, Dict, Any, Optional
class SecureDatabase:
"""Database клиент с защитой от SQL injection."""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self) -> None:
"""Создание connection pool."""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=10,
max_size=20,
command_timeout=60,
# Безопасные настройки
server_settings={
'application_name': 'secure_app',
'jit': 'off' # Отключаем JIT для стабильности
}
)
async def fetch_user_safe(self, user_id: int) -> Optional[Dict[str, Any]]:
"""
✅ ПРАВИЛЬНО: Используем параметризованные запросы.
Защита от SQL injection через prepared statements.
"""
if not self.pool:
raise RuntimeError("Database not connected")
async with self.pool.acquire() as conn:
# Параметры передаются отдельно — безопасно
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
async def search_users_unsafe(self, query: str) -> List[Dict[str, Any]]:
"""
❌ НЕПРАВИЛЬНО: SQL injection уязвимость!
НЕ ИСПОЛЬЗУЙТЕ В PRODUCTION!
"""
if not self.pool:
raise RuntimeError("Database not connected")
# ОПАСНО: Конкатенация строк
sql = f"SELECT * FROM users WHERE name LIKE '%{query}%'"
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql)
return [dict(row) for row in rows]
# Атакующий может передать: query = "'; DROP TABLE users; --"
async def search_users_safe(self, query: str) -> List[Dict[str, Any]]:
"""
✅ ПРАВИЛЬНО: Защита от SQL injection.
Параметризованный запрос + валидация ввода.
"""
if not self.pool:
raise RuntimeError("Database not connected")
# Валидация входных данных
if len(query) > 100:
raise ValueError("Query too long")
# Удаляем опасные символы
safe_query = query.replace("'", "").replace(";", "").replace("--", "")
async with self.pool.acquire() as conn:
# Параметры через $1 — безопасно
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE name ILIKE $1",
f"%{safe_query}%"
)
return [dict(row) for row in rows]4. Безопасное хранение секретов
import os
from typing import Optional
from cryptography.fernet import Fernet
import base64
import hashlib
class SecureConfig:
"""Безопасное управление конфигурацией и секретами."""
def __init__(self):
# Ключ шифрования из environment (НЕ хардкодим!)
encryption_key = os.getenv('ENCRYPTION_KEY')
if not encryption_key:
raise ValueError("ENCRYPTION_KEY not set")
# Генерируем Fernet ключ из пароля
key = base64.urlsafe_b64encode(
hashlib.sha256(encryption_key.encode()).digest()
)
self.cipher = Fernet(key)
def get_secret(self, key: str) -> Optional[str]:
"""
Получение секрета из environment variables.
✅ ПРАВИЛЬНО: Секреты хранятся в env, не в коде.
"""
encrypted = os.getenv(key)
if not encrypted:
return None
try:
decrypted = self.cipher.decrypt(encrypted.encode())
return decrypted.decode()
except Exception:
return None
@staticmethod
def mask_sensitive(data: str, show_chars: int = 4) -> str:
"""Маскирует чувствительные данные для логов."""
if len(data) <= show_chars:
return '*' * len(data)
return data[:show_chars] + '*' * (len(data) - show_chars)
# Использование
config = SecureConfig()
# ✅ ПРАВИЛЬНО: Секреты из env
db_password = config.get_secret('DB_PASSWORD')
api_key = config.get_secret('API_KEY')
# ✅ ПРАВИЛЬНО: Маскируем в логах
logger.info(f"Connecting with key: {config.mask_sensitive(api_key)}")
# Output: "Connecting with key: sk_t********************"
# ❌ НИКОГДА не делайте так:
# DB_PASSWORD = "my_secret_password" # Хардкод секрета!
# logger.info(f"Using password: {db_password}") # Секрет в логах!5. Rate Limiting с защитой от abuse
import asyncio
import time
from collections import defaultdict
from typing import Dict, Tuple
import logging
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Rate limiter с защитой от abuse.
Implements sliding window algorithm.
"""
def __init__(
self,
max_requests: int = 100,
window_seconds: int = 60,
ban_threshold: int = 5
):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.ban_threshold = ban_threshold
# История запросов: {client_id: [(timestamp, count)]}
self.requests: Dict[str, list] = defaultdict(list)
# Забаненные клиенты: {client_id: unban_timestamp}
self.banned: Dict[str, float] = {}
async def is_allowed(self, client_id: str) -> bool:
"""
Проверяет, разрешен ли запрос для клиента.
Args:
client_id: Идентификатор клиента (IP, user_id, etc.)
Returns:
True если запрос разрешен, False если превышен лимит
"""
now = time.time()
# Проверяем бан
if client_id in self.banned:
if now < self.banned[client_id]:
logger.warning(f"Client {client_id} is banned")
return False
else:
# Бан истек
del self.banned[client_id]
# Очищаем старые запросы
cutoff = now - self.window_seconds
self.requests[client_id] = [
(ts, count) for ts, count in self.requests[client_id]
if ts > cutoff
]
# Подсчитываем запросы в окне
total_requests = sum(count for _, count in self.requests[client_id])
if total_requests >= self.max_requests:
# Превышен лимит
logger.warning(
f"Rate limit exceeded for {client_id}: "
f"{total_requests}/{self.max_requests}"
)
# Банним при частых превышениях
violation_count = sum(
1 for ts, _ in self.requests[client_id]
if ts > now - 10 # Последние 10 секунд
)
if violation_count >= self.ban_threshold:
ban_duration = 3600 # 1 час
self.banned[client_id] = now + ban_duration
logger.error(
f"Client {client_id} banned for {ban_duration}s "
f"due to abuse"
)
return False
# Разрешаем запрос
self.requests[client_id].append((now, 1))
return True
# Использование в API handler
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
async def api_handler(request):
"""API handler с rate limiting."""
client_id = request.remote_addr # IP адрес
if not await rate_limiter.is_allowed(client_id):
return {
"error": "Rate limit exceeded",
"retry_after": 60
}, 429
# Обрабатываем запрос
return await process_request(request)Security Checklist
Перед деплоем в production:
- ✅ Все секреты в environment variables, не в коде
- ✅ SSL/TLS для всех внешних подключений
- ✅ Валидация и санитизация всех входных данных
- ✅ Параметризованные SQL запросы (защита от injection)
- ✅ Rate limiting на всех публичных endpoints
- ✅ Ограничение размера запросов/ответов
- ✅ Таймауты на всех внешних вызовах
- ✅ Ограничение количества одновременных задач
- ✅ Логирование с маскированием чувствительных данных
- ✅ Обработка всех исключений (не показываем стектрейсы клиенту)
- ✅ CORS настроен корректно
- ✅ Регулярные security audits зависимостей
Миграция Legacy Кода на Asyncio
Постепенный переход: Не нужно переписывать всё сразу. Asyncio можно
внедрять постепенно, используя run_in_executor() для оборачивания
синхронного кода.
Стратегия миграции
Этап 1: Идентификация узких мест
import cProfile
import pstats
from io import StringIO
# Профилируем существующий синхронный код
def profile_sync_code():
profiler = cProfile.Profile()
profiler.enable()
# Ваш синхронный код
result = sync_process_data()
profiler.disable()
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20)
print(stream.getvalue())
# Ищем медленные I/O операции:
# - HTTP запросы (requests.get)
# - Запросы к БД (psycopg2, pymongo)
# - Файловые операции (open, read, write)
# - time.sleepЭтап 2: Wrap в run_in_executor
import asyncio
import requests # Синхронная библиотека
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any
# Исходный синхронный код
def sync_fetch_url(url: str) -> Dict[str, Any]:
"""Старая синхронная функция."""
response = requests.get(url, timeout=10)
return response.json()
def sync_process_data(urls: List[str]) -> List[Dict[str, Any]]:
"""Старая синхронная обработка."""
results = []
for url in urls:
data = sync_fetch_url(url)
results.append(data)
return results
# Этап 2: Оборачиваем в executor
async def async_fetch_url_v1(url: str) -> Dict[str, Any]:
"""
Версия 1: Обернули синхронную функцию в executor.
Плюсы: Работает сразу, не нужно менять код
Минусы: Overhead на создание потоков, не настоящий async
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, # Используем default ThreadPoolExecutor
sync_fetch_url,
url
)
async def async_process_data_v1(urls: List[str]) -> List[Dict[str, Any]]:
"""
Версия 1: Параллелим через executor.
Улучшение: Теперь запросы выполняются параллельно!
"""
tasks = [async_fetch_url_v1(url) for url in urls]
return await asyncio.gather(*tasks)
# Этап 3: Переписываем на настоящий async
async def async_fetch_url_v2(url: str) -> Dict[str, Any]:
"""
Версия 2: Полностью асинхронная версия.
Плюсы: Настоящий async, лучшая производительность
Минусы: Требует переписывания и замены библиотек
"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return await resp.json()
async def async_process_data_v2(urls: List[str]) -> List[Dict[str, Any]]:
"""
Версия 2: Чистый async код.
Максимальная производительность и эффективность.
"""
tasks = [async_fetch_url_v2(url) for url in urls]
return await asyncio.gather(*tasks)
# Сравнение производительности
import time
async def benchmark_migration():
"""Бенчмарк трех подходов."""
urls = ["https://httpbin.org/delay/1"] * 10
# Синхронный (baseline)
start = time.time()
sync_results = sync_process_data(urls)
sync_time = time.time() - start
print(f"Sync: {sync_time:.2f}s") # ~10 секунд
# Executor (промежуточный)
start = time.time()
executor_results = await async_process_data_v1(urls)
executor_time = time.time() - start
print(f"Executor: {executor_time:.2f}s") # ~1-2 секунды
# Full async (финальный)
start = time.time()
async_results = await async_process_data_v2(urls)
async_time = time.time() - start
print(f"Async: {async_time:.2f}s") # ~1 секунда
print(f"\nУскорение executor: {sync_time/executor_time:.1f}x")
print(f"Ускорение async: {sync_time/async_time:.1f}x")Миграция Database кода
import asyncio
import psycopg2 # Синхронная библиотека
import asyncpg # Асинхронная библиотека
from contextlib import asynccontextmanager
# Старый синхронный код
class SyncDatabase:
"""Старый синхронный database wrapper."""
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
def fetch_users(self) -> list:
"""Синхронный запрос."""
cursor = self.conn.cursor()
cursor.execute("SELECT id, name FROM users")
return cursor.fetchall()
def close(self):
self.conn.close()
# Этап 1: Wrap в executor
class HybridDatabase:
"""
Гибридный database wrapper.
Использует синхронный код через executor.
"""
def __init__(self, dsn: str):
self.dsn = dsn
self.conn = None
self.executor = ThreadPoolExecutor(max_workers=10)
def connect(self):
"""Синхронное подключение."""
self.conn = psycopg2.connect(self.dsn)
async def fetch_users(self) -> list:
"""Async обертка над синхронным запросом."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
self._sync_fetch_users
)
def _sync_fetch_users(self) -> list:
"""Синхронный запрос (internal)."""
cursor = self.conn.cursor()
cursor.execute("SELECT id, name FROM users")
return cursor.fetchall()
async def close(self):
"""Async закрытие."""
loop = asyncio.get_running_loop()
await loop.run_in_executor(
self.executor,
self.conn.close
)
# Этап 2: Полностью async
class AsyncDatabase:
"""
Полностью асинхронный database wrapper.
Использует asyncpg для настоящего async I/O.
"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def connect(self):
"""Асинхронное подключение с pooling."""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=10,
max_size=20
)
async def fetch_users(self) -> list:
"""Асинхронный запрос."""
async with self.pool.acquire() as conn:
rows = await conn.fetch("SELECT id, name FROM users")
return [dict(row) for row in rows]
async def close(self):
"""Асинхронное закрытие."""
if self.pool:
await self.pool.close()Постепенная миграция FastAPI приложения
from fastapi import FastAPI
import asyncio
app = FastAPI()
# Этап 1: Синхронные endpoints (работают, но блокируют)
@app.get("/sync/users")
def get_users_sync():
"""Синхронный endpoint - блокирует event loop."""
users = sync_database.fetch_users() # Блокирующий вызов
return {"users": users}
# Этап 2: Executor для legacy кода
@app.get("/hybrid/users")
async def get_users_hybrid():
"""Hybrid endpoint - использует executor."""
loop = asyncio.get_running_loop()
users = await loop.run_in_executor(
None,
sync_database.fetch_users
)
return {"users": users}
# Этап 3: Полностью async
@app.get("/async/users")
async def get_users_async():
"""Полностью асинхронный endpoint."""
users = await async_database.fetch_users()
return {"users": users}
# Performance improvement:
# sync -> hybrid: ~2-5x faster под нагрузкой
# hybrid -> async: ~1.5-2x faster + меньше памятиРекомендации по миграции
Порядок миграции:
- Профилируйте — найдите самые медленные части
- Оборачивайте — используйте
run_in_executor()для I/O - Тестируйте — убедитесь что работает как раньше
- Переписывайте — постепенно заменяйте на async библиотеки
- Оптимизируйте — убирайте executor, используйте чистый async
Что мигрировать в первую очередь:
- ✅ HTTP API calls (requests → aiohttp)
- ✅ Database queries (psycopg2 → asyncpg, pymongo → motor)
- ✅ File operations (open → aiofiles)
- ✅ Redis (redis-py → aioredis)
- ❌ CPU-intensive код (лучше использовать multiprocessing)
Мониторинг с Prometheus
Production мониторинг: Prometheus — индустриальный стандарт для сбора метрик. Интеграция с asyncio позволяет отслеживать производительность и здоровье приложения в реальном времени.
Базовая настройка Prometheus
# Установка
pip install prometheus-client aiohttpimport asyncio
import time
from prometheus_client import Counter, Histogram, Gauge, Info
from prometheus_client import start_http_server
from typing import Callable, Any
import functools
# Определяем метрики
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint'],
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0)
)
ACTIVE_TASKS = Gauge(
'asyncio_tasks_active',
'Number of active asyncio tasks'
)
TASK_DURATION = Histogram(
'asyncio_task_duration_seconds',
'Task execution duration',
['task_name'],
buckets=(0.001, 0.01, 0.1, 0.5, 1.0, 5.0)
)
DATABASE_CONNECTIONS = Gauge(
'database_connections_active',
'Active database connections'
)
CACHE_HITS = Counter(
'cache_hits_total',
'Total cache hits'
)
CACHE_MISSES = Counter(
'cache_misses_total',
'Total cache misses'
)
APP_INFO = Info(
'app_info',
'Application information'
)
# Устанавливаем информацию о приложении
APP_INFO.info({
'version': '1.0.0',
'python_version': '3.11',
'environment': 'production'
})Декораторы для автоматического мониторинга
def monitor_async_task(task_name: str = None):
"""
Декоратор для мониторинга asyncio задач.
Отслеживает:
- Время выполнения
- Количество активных задач
- Ошибки
"""
def decorator(func: Callable) -> Callable:
name = task_name or func.__name__
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
ACTIVE_TASKS.inc()
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
TASK_DURATION.labels(task_name=name).observe(duration)
return result
except Exception as e:
# Метрика ошибок
ERROR_COUNT.labels(
task_name=name,
error_type=type(e).__name__
).inc()
raise
finally:
ACTIVE_TASKS.dec()
return wrapper
return decorator
ERROR_COUNT = Counter(
'task_errors_total',
'Total task errors',
['task_name', 'error_type']
)
# Использование
@monitor_async_task(task_name="fetch_user_data")
async def fetch_user_data(user_id: int):
"""Функция с автоматическим мониторингом."""
await asyncio.sleep(0.1)
return {"id": user_id, "name": "User"}Мониторинг HTTP endpoints
from aiohttp import web
import time
async def monitor_request(request, handler):
"""Middleware для мониторинга HTTP запросов."""
start_time = time.time()
method = request.method
endpoint = request.path
try:
response = await handler(request)
status = response.status
# Записываем метрики
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
duration = time.time() - start_time
REQUEST_DURATION.labels(
method=method,
endpoint=endpoint
).observe(duration)
return response
except Exception as e:
# Ошибка обработки запроса
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=500
).inc()
raise
# Создание app с middleware
app = web.Application(middlewares=[monitor_request])Кастомные метрики для бизнес-логики
# Метрики для кэша
class MonitoredCache:
"""Cache с Prometheus метриками."""
def __init__(self):
self.cache = {}
async def get(self, key: str):
"""Get с отслеживанием hit/miss."""
if key in self.cache:
CACHE_HITS.inc()
return self.cache[key]
else:
CACHE_MISSES.inc()
return None
async def set(self, key: str, value: Any):
"""Set значения в кэш."""
self.cache[key] = value
# Метрики для database pool
class MonitoredDatabase:
"""Database с метриками connection pool."""
def __init__(self, pool):
self.pool = pool
async def execute(self, query: str):
"""Execute с отслеживанием подключений."""
DATABASE_CONNECTIONS.inc()
try:
async with self.pool.acquire() as conn:
return await conn.execute(query)
finally:
DATABASE_CONNECTIONS.dec()Production setup
import asyncio
from prometheus_client import start_http_server
async def main():
"""Главная функция с Prometheus мониторингом."""
# Запускаем Prometheus HTTP сервер на порту 9090
start_http_server(9090)
print("Prometheus metrics available at http://localhost:9090/metrics")
# Ваше приложение
app = create_application()
await app.run()
if __name__ == "__main__":
asyncio.run(main())Grafana Dashboard запросы
# CPU utilization для asyncio tasks
rate(asyncio_task_duration_seconds_sum[5m])
# Количество активных задач
asyncio_tasks_active
# HTTP request rate
rate(http_requests_total[1m])
# HTTP latency (95th percentile)
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# Error rate
rate(task_errors_total[5m])
# Cache hit rate
rate(cache_hits_total[5m]) / (rate(cache_hits_total[5m]) + rate(cache_misses_total[5m]))
# Database connection pool usage
database_connections_activeAlerts для критических метрик
# prometheus_alerts.yml
groups:
- name: asyncio_app
rules:
# Слишком много активных задач
- alert: TooManyActiveTasks
expr: asyncio_tasks_active > 1000
for: 5m
annotations:
summary: "Too many active asyncio tasks"
# Высокая задержка
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1
for: 5m
annotations:
summary: "95th percentile latency above 1s"
# Высокий error rate
- alert: HighErrorRate
expr: rate(task_errors_total[5m]) > 10
for: 5m
annotations:
summary: "High error rate detected"Теперь ваше asyncio приложение полностью observable и готово к production!