Перейти к содержимому
databaseСредний80 минут

PostgreSQL для Python-разработчиков

Полное практическое руководство по работе с PostgreSQL в Python - от SQLAlchemy до оптимизации запросов и асинхронной работы с asyncpg

#postgresql#python#sqlalchemy#asyncpg#database#sql#orm#индексы#транзакции#оптимизация

PostgreSQL для Python-разработчиков

PostgreSQL - это не просто база данных, это швейцарский нож для хранения данных. JSON, полнотекстовый поиск, GIS, массивы - всё из коробки.

Как пользоваться материалом

  • Спешишь? Читай введение, SQLAlchemy Core/ORM и раздел про N+1 проблему.
  • Оптимизируешь производительность? Изучи индексы, EXPLAIN ANALYZE и партиционирование.
  • Пишешь асинхронное приложение? Переходи сразу к asyncpg и асинхронному SQLAlchemy.
  • Работаешь с транзакциями? Смотри раздел про isolation levels и конкуррентность.
  • Мигрируешь с другой БД? Начни с установки и основ PostgreSQL.

Глоссарий

ORM (Object-Relational Mapping) - Подход, при котором таблицы БД представляются как Python-классы, а строки как объекты.

SQLAlchemy Core - Низкоуровневый SQL-конструктор SQLAlchemy без ORM магии.

SQLAlchemy ORM - Высокоуровневый слой SQLAlchemy с моделями, сессиями и автоматическими JOIN.

Session - Контекст работы с БД в SQLAlchemy ORM. Хранит изменения до commit/rollback.

Query - Объект SQLAlchemy для построения SELECT запросов.

N+1 Problem - Антипаттерн, когда вместо одного запроса с JOIN делается N+1 запросов (1 для основной таблицы + N для связанных).

Eager Loading - Загрузка связанных объектов сразу (через JOIN), предотвращает N+1.

Lazy Loading - Отложенная загрузка связанных объектов (по требованию). Может вызвать N+1.

Transaction - Группа операций БД, выполняющихся атомарно (всё или ничего).

ACID - Atomicity, Consistency, Isolation, Durability. Гарантии транзакций.

Isolation Level - Уровень изоляции транзакций (READ COMMITTED, REPEATABLE READ, SERIALIZABLE).

Index - Структура данных для ускорения поиска. Аналог индекса в книге.

B-tree Index - Стандартный индекс PostgreSQL для сравнений (=, <, >, BETWEEN).

GIN Index - Generalized Inverted Index для массивов, JSON, полнотекстового поиска.

Partial Index - Индекс только для части таблицы (WHERE условие).

EXPLAIN ANALYZE - Команда для анализа плана выполнения запроса и реального времени.

Connection Pool - Пул переиспользуемых подключений к БД для снижения overhead.

Migration - Версионирование схемы БД. Скрипты для изменения структуры таблиц.

Alembic - Инструмент миграций для SQLAlchemy.

asyncpg - Асинхронный драйвер PostgreSQL для Python (самый быстрый).

psycopg2 - Синхронный драйвер PostgreSQL (стандарт индустрии).

Введение: Почему PostgreSQL?

PostgreSQL vs. MySQL vs. SQLite

┌─────────────────┬──────────────┬──────────────┬──────────────┐
│   Характеристика│  PostgreSQL  │    MySQL     │    SQLite    │
├─────────────────┼──────────────┼──────────────┼──────────────┤
│ Соответствие SQL│   Отлично    │   Хорошо     │   Хорошо     │
│ JSON поддержка  │   Нативная   │   Базовая    │   Нет        │
│ Полнотекст поиск│   Встроенный │   Встроенный │   FTS5       │
│ Window Functions│   Да         │   Да (5.7+)  │   Да (3.25+) │
│ Массивы         │   Да         │   Нет        │   Нет        │
│ JSONB           │   Да         │   Нет        │   Нет        │
│ Расширения      │   Огромное   │   Мало       │   Нет        │
│ Конкуррентность │   MVCC       │   Locks      │   Locks      │
│ Use case        │   Всё        │   Web apps   │   Embedded   │
└─────────────────┴──────────────┴──────────────┴──────────────┘

Когда выбирать PostgreSQL:

  • ✅ Сложные запросы и аналитика
  • ✅ JSON/JSONB данные (NoSQL внутри SQL)
  • ✅ Геоданные (PostGIS расширение)
  • ✅ Полнотекстовый поиск
  • ✅ Высокая конкуррентность (MVCC)
  • ✅ Соответствие стандартам SQL

Когда можно выбрать MySQL:

  • Простые CRUD операции
  • Репликация из коробки проще
  • Compatibility с legacy системами

Когда выбрать SQLite:

  • Embedded приложения
  • Прототипы и тестирование
  • Мобильные приложения
  • Не нужна конкуррентность

Установка и настройка

Установка PostgreSQL

macOS (Homebrew):

brew install postgresql@16
brew services start postgresql@16
 
# Проверка
psql --version

Ubuntu/Debian:

sudo apt update
sudo apt install postgresql postgresql-contrib
 
# Запуск
sudo systemctl start postgresql
sudo systemctl enable postgresql
 
# Вход под пользователем postgres
sudo -u postgres psql

Docker (рекомендуется для разработки):

docker run -d \
  --name postgres-dev \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_DB=myapp \
  -p 5432:5432 \
  -v postgres_data:/var/lib/postgresql/data \
  postgres:16-alpine
# docker-compose.yml
version: "3.8"
 
services:
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: myapp
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
 
volumes:
  postgres_data:

Установка Python библиотек

# Синхронные драйверы
pip install psycopg2-binary  # Или psycopg2 (требует компиляцию)
 
# SQLAlchemy
pip install sqlalchemy
 
# Асинхронные драйверы
pip install asyncpg
pip install sqlalchemy[asyncio]  # SQLAlchemy 2.0 с asyncio
 
# Миграции
pip install alembic
 
# Все вместе для веб-приложения
pip install \
  sqlalchemy[asyncio] \
  asyncpg \
  psycopg2-binary \
  alembic

Базовая настройка PostgreSQL

# Вход в psql
psql -U postgres
 
# Создание БД
CREATE DATABASE myapp;
 
# Создание пользователя
CREATE USER myuser WITH PASSWORD 'mypassword';
 
# Права доступа
GRANT ALL PRIVILEGES ON DATABASE myapp TO myuser;
 
# Вход под новым пользователем
\q
psql -U myuser -d myapp

SQLAlchemy Core: SQL без ORM

SQLAlchemy Core - это мощный SQL конструктор без магии ORM. Используйте его для сложных запросов, bulk operations и максимального контроля.

Подключение к БД

from sqlalchemy import create_engine, text
 
# ========================================
# Синхронное подключение
# ========================================
 
# Формат: postgresql://user:password@host:port/database
DATABASE_URL = "postgresql://postgres:postgres@localhost:5432/myapp"
 
# Создание engine
engine = create_engine(
    DATABASE_URL,
    echo=True,  # Логировать все SQL запросы
    pool_size=10,  # Размер connection pool
    max_overflow=20,  # Дополнительные подключения при нагрузке
    pool_pre_ping=True,  # Проверка подключения перед использованием
)
 
# Выполнение запроса
with engine.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    print(result.fetchone())

Определение таблиц

from sqlalchemy import (
    MetaData,
    Table,
    Column,
    Integer,
    String,
    Text,
    DateTime,
    Boolean,
    ForeignKey,
    Index,
)
from datetime import datetime
 
metadata = MetaData()
 
# ========================================
# Таблица пользователей
# ========================================
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("email", String(255), unique=True, nullable=False, index=True),
    Column("username", String(100), unique=True, nullable=False),
    Column("password_hash", String(255), nullable=False),
    Column("is_active", Boolean, default=True),
    Column("created_at", DateTime, default=datetime.utcnow),
    Column("updated_at", DateTime, default=datetime.utcnow, onupdate=datetime.utcnow),
)
 
# ========================================
# Таблица постов
# ========================================
posts = Table(
    "posts",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("title", String(255), nullable=False),
    Column("content", Text, nullable=False),
    Column("user_id", Integer, ForeignKey("users.id", ondelete="CASCADE")),
    Column("published", Boolean, default=False),
    Column("created_at", DateTime, default=datetime.utcnow),
    # Композитный индекс
    Index("idx_user_published", "user_id", "published"),
)
 
# Создание таблиц
metadata.create_all(engine)

CRUD операции (Core)

from sqlalchemy import select, insert, update, delete
 
# ========================================
# CREATE
# ========================================
 
# Вставка одной записи
stmt = insert(users).values(
    email="user@example.com",
    username="john_doe",
    password_hash="hashed_password",
)
 
with engine.connect() as conn:
    result = conn.execute(stmt)
    conn.commit()
    print(f"Inserted ID: {result.inserted_primary_key[0]}")
 
# Bulk insert
stmt = insert(users)
data = [
    {"email": "alice@example.com", "username": "alice", "password_hash": "hash1"},
    {"email": "bob@example.com", "username": "bob", "password_hash": "hash2"},
]
 
with engine.connect() as conn:
    conn.execute(stmt, data)
    conn.commit()
 
# ========================================
# READ
# ========================================
 
# Выборка всех пользователей
stmt = select(users)
 
with engine.connect() as conn:
    result = conn.execute(stmt)
    for row in result:
        print(row)
 
# Выборка с WHERE
stmt = select(users).where(users.c.is_active == True)
 
# Множественные условия
stmt = select(users).where(
    users.c.is_active == True,
    users.c.email.like("%@example.com"),
)
 
# ORDER BY
stmt = select(users).order_by(users.c.created_at.desc())
 
# LIMIT и OFFSET
stmt = select(users).limit(10).offset(20)
 
# Выборка конкретных колонок
stmt = select(users.c.id, users.c.email)
 
# ========================================
# JOIN
# ========================================
 
# INNER JOIN
stmt = (
    select(users.c.username, posts.c.title)
    .select_from(users.join(posts, users.c.id == posts.c.user_id))
    .where(posts.c.published == True)
)
 
# LEFT JOIN
stmt = select(users.c.username, posts.c.title).select_from(
    users.outerjoin(posts, users.c.id == posts.c.user_id)
)
 
# ========================================
# UPDATE
# ========================================
 
# Обновление одной записи
stmt = (
    update(users)
    .where(users.c.id == 1)
    .values(username="new_username", updated_at=datetime.utcnow())
)
 
with engine.connect() as conn:
    result = conn.execute(stmt)
    conn.commit()
    print(f"Updated {result.rowcount} rows")
 
# ========================================
# DELETE
# ========================================
 
# Удаление
stmt = delete(users).where(users.c.is_active == False)
 
with engine.connect() as conn:
    result = conn.execute(stmt)
    conn.commit()
    print(f"Deleted {result.rowcount} rows")

Агрегация и группировка

from sqlalchemy import func, and_, or_
 
# COUNT
stmt = select(func.count(users.c.id))
 
# GROUP BY
stmt = (
    select(posts.c.user_id, func.count(posts.c.id).label("post_count"))
    .group_by(posts.c.user_id)
)
 
# HAVING
stmt = (
    select(posts.c.user_id, func.count(posts.c.id).label("post_count"))
    .group_by(posts.c.user_id)
    .having(func.count(posts.c.id) > 5)
)
 
# Сложные агрегации
stmt = select(
    func.count(users.c.id).label("total_users"),
    func.count(users.c.id).filter(users.c.is_active == True).label("active_users"),
    func.max(users.c.created_at).label("newest_user"),
).select_from(users)

SQLAlchemy ORM: Объектный подход

ORM делает код чище и безопаснее, но добавляет абстракцию. Для простых CRUD операций ORM отлично, для сложной аналитики - используйте Core или raw SQL.

Определение моделей

from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationship, Session
from datetime import datetime
 
# ========================================
# SQLAlchemy 2.0 стиль
# ========================================
 
class Base(DeclarativeBase):
    pass
 
class User(Base):
    __tablename__ = "users"
 
    id = Column(Integer, primary_key=True)
    email = Column(String(255), unique=True, nullable=False, index=True)
    username = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
 
    # Связь с постами
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
 
    def __repr__(self):
        return f"<User(id={self.id}, username={self.username})>"
 
class Post(Base):
    __tablename__ = "posts"
 
    id = Column(Integer, primary_key=True)
    title = Column(String(255), nullable=False)
    content = Column(Text, nullable=False)
    user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    published = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
 
    # Обратная связь с пользователем
    author = relationship("User", back_populates="posts")
 
    def __repr__(self):
        return f"<Post(id={self.id}, title={self.title})>"
 
# Создание таблиц
DATABASE_URL = "postgresql://postgres:postgres@localhost:5432/myapp"
engine = create_engine(DATABASE_URL, echo=True)
Base.metadata.create_all(engine)

CRUD операции (ORM)

from sqlalchemy.orm import Session
 
# ========================================
# CREATE
# ========================================
 
with Session(engine) as session:
    # Создание пользователя
    user = User(
        email="alice@example.com",
        username="alice",
        password_hash="hashed_password",
    )
    session.add(user)
    session.commit()
    session.refresh(user)  # Обновить объект из БД (получить ID)
    print(f"Created user: {user.id}")
 
    # Bulk insert
    users = [
        User(email="bob@example.com", username="bob", password_hash="hash1"),
        User(email="charlie@example.com", username="charlie", password_hash="hash2"),
    ]
    session.add_all(users)
    session.commit()
 
# ========================================
# READ
# ========================================
 
with Session(engine) as session:
    # Получение по ID
    user = session.get(User, 1)
    print(user)
 
    # Получение первого совпадения
    user = session.query(User).filter(User.email == "alice@example.com").first()
 
    # Получение всех
    users = session.query(User).all()
 
    # Фильтрация
    active_users = session.query(User).filter(User.is_active == True).all()
 
    # Множественные фильтры
    users = (
        session.query(User)
        .filter(User.is_active == True)
        .filter(User.email.like("%@example.com"))
        .all()
    )
 
    # Сортировка
    users = session.query(User).order_by(User.created_at.desc()).all()
 
    # Пагинация
    users = session.query(User).limit(10).offset(20).all()
 
    # Подсчет
    count = session.query(User).count()
 
# ========================================
# UPDATE
# ========================================
 
with Session(engine) as session:
    user = session.get(User, 1)
    user.username = "new_username"
    user.updated_at = datetime.utcnow()
    session.commit()
 
    # Bulk update
    session.query(User).filter(User.is_active == False).update(
        {"is_active": True}, synchronize_session=False
    )
    session.commit()
 
# ========================================
# DELETE
# ========================================
 
with Session(engine) as session:
    user = session.get(User, 1)
    session.delete(user)
    session.commit()
 
    # Bulk delete
    session.query(User).filter(User.is_active == False).delete()
    session.commit()

Relationships и JOIN

# ========================================
# Доступ к связанным объектам
# ========================================
 
with Session(engine) as session:
    # Получить пользователя и его посты
    user = session.get(User, 1)
 
    # ❌ N+1 проблема! (lazy loading)
    for post in user.posts:  # Каждая итерация = новый SQL запрос
        print(post.title)
 
    # ✅ Eager loading с joinedload
    from sqlalchemy.orm import joinedload
 
    user = (
        session.query(User)
        .options(joinedload(User.posts))
        .filter(User.id == 1)
        .first()
    )
 
    # Теперь посты уже загружены одним запросом
    for post in user.posts:  # Без дополнительных запросов
        print(post.title)
 
# ========================================
# Типы eager loading
# ========================================
 
from sqlalchemy.orm import joinedload, selectinload, subqueryload
 
with Session(engine) as session:
    # joinedload - LEFT OUTER JOIN (один запрос)
    users = session.query(User).options(joinedload(User.posts)).all()
 
    # selectinload - два запроса (более эффективно для many-to-many)
    users = session.query(User).options(selectinload(User.posts)).all()
 
    # subqueryload - два запроса с подзапросом
    users = session.query(User).options(subqueryload(User.posts)).all()
 
# ========================================
# Ручной JOIN
# ========================================
 
with Session(engine) as session:
    results = (
        session.query(User, Post)
        .join(Post, User.id == Post.user_id)
        .filter(Post.published == True)
        .all()
    )
 
    for user, post in results:
        print(f"{user.username}: {post.title}")

N+1 Problem: Главная ловушка ORM

N+1 проблема - это когда вместо одного запроса с JOIN делается N+1 запросов. Это убийца производительности #1 в ORM приложениях.

Пример N+1 проблемы

# ❌ N+1 ПРОБЛЕМА
 
with Session(engine) as session:
    # 1 запрос: получить всех пользователей
    users = session.query(User).all()  # SELECT * FROM users
 
    # N запросов: для каждого пользователя получить посты
    for user in users:  # Если 100 пользователей = 100 запросов!
        print(f"{user.username} has {len(user.posts)} posts")
        # SELECT * FROM posts WHERE user_id = 1
        # SELECT * FROM posts WHERE user_id = 2
        # SELECT * FROM posts WHERE user_id = 3
        # ... 100 раз
 
# Итого: 101 запрос вместо 1!

Решение: Eager Loading

from sqlalchemy.orm import joinedload, selectinload
 
# ✅ РЕШЕНИЕ 1: joinedload (LEFT OUTER JOIN)
 
with Session(engine) as session:
    users = session.query(User).options(joinedload(User.posts)).all()
    # SELECT users.*, posts.*
    # FROM users
    # LEFT OUTER JOIN posts ON users.id = posts.user_id
 
    for user in users:
        print(f"{user.username} has {len(user.posts)} posts")  # Без доп. запросов!
 
# Итого: 1 запрос
 
# ✅ РЕШЕНИЕ 2: selectinload (два запроса, но эффективнее для больших данных)
 
with Session(engine) as session:
    users = session.query(User).options(selectinload(User.posts)).all()
    # Запрос 1: SELECT * FROM users
    # Запрос 2: SELECT * FROM posts WHERE user_id IN (1, 2, 3, ...)
 
    for user in users:
        print(f"{user.username} has {len(user.posts)} posts")
 
# Итого: 2 запроса вместо 101!

Вложенные relationship

# Модель с тремя уровнями вложенности
class Comment(Base):
    __tablename__ = "comments"
 
    id = Column(Integer, primary_key=True)
    text = Column(Text)
    post_id = Column(Integer, ForeignKey("posts.id"))
    user_id = Column(Integer, ForeignKey("users.id"))
 
    post = relationship("Post", back_populates="comments")
    author = relationship("User")
 
Post.comments = relationship("Comment", back_populates="post")
 
# ❌ N+1+N проблема
users = session.query(User).all()
for user in users:
    for post in user.posts:
        for comment in post.comments:
            print(comment.text)
 
# ✅ Вложенный eager loading
users = (
    session.query(User)
    .options(
        selectinload(User.posts).selectinload(Post.comments)
    )
    .all()
)
 
# Или через joinedload для всего дерева
users = (
    session.query(User)
    .options(
        joinedload(User.posts).joinedload(Post.comments)
    )
    .all()
)

Автоматическое обнаружение N+1

# Включите логирование SQL
import logging
 
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
 
# Теперь вы увидите все SQL запросы в консоли
# Если видите десятки одинаковых SELECT - это N+1!

Индексы: Ускорение запросов

Правильные индексы - это разница между запросом за 0.5ms и запросом за 5 секунд. Изучите EXPLAIN ANALYZE - это ваш лучший друг.

Типы индексов в PostgreSQL

from sqlalchemy import Index, text
 
# ========================================
# 1. B-tree индекс (по умолчанию)
# ========================================
# Используется для: =, <, <=, >, >=, BETWEEN, IN, IS NULL
 
class User(Base):
    __tablename__ = "users"
 
    id = Column(Integer, primary_key=True)
    email = Column(String(255), index=True)  # Простой индекс
    username = Column(String(100))
 
    # Композитный индекс
    __table_args__ = (
        Index("idx_email_username", "email", "username"),
    )
 
# ========================================
# 2. Partial Index (условный индекс)
# ========================================
# Индекс только для части таблицы
 
class Post(Base):
    __tablename__ = "posts"
 
    id = Column(Integer, primary_key=True)
    published = Column(Boolean)
    created_at = Column(DateTime)
 
    # Индекс только для опубликованных постов
    __table_args__ = (
        Index(
            "idx_published_posts",
            "created_at",
            postgresql_where=text("published = true"),
        ),
    )
 
# ========================================
# 3. GIN индекс для JSONB и массивов
# ========================================
 
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
 
class Product(Base):
    __tablename__ = "products"
 
    id = Column(Integer, primary_key=True)
    data = Column(JSONB)  # JSONB поле
    tags = Column(ARRAY(String))  # Массив
 
    __table_args__ = (
        Index("idx_product_data", "data", postgresql_using="gin"),
        Index("idx_product_tags", "tags", postgresql_using="gin"),
    )
 
# Теперь можно быстро искать по JSON и массивам:
# SELECT * FROM products WHERE data @> '{"color": "red"}'
# SELECT * FROM products WHERE tags @> ARRAY['electronics']
 
# ========================================
# 4. Full-text search индекс
# ========================================
 
from sqlalchemy.dialects.postgresql import TSVECTOR
 
class Article(Base):
    __tablename__ = "articles"
 
    id = Column(Integer, primary_key=True)
    title = Column(String(255))
    content = Column(Text)
    search_vector = Column(TSVECTOR)  # Полнотекстовый индекс
 
    __table_args__ = (
        Index("idx_search_vector", "search_vector", postgresql_using="gin"),
    )
 
# Создание trigger для автообновления search_vector в PostgreSQL:
"""
CREATE TRIGGER articles_search_update
BEFORE INSERT OR UPDATE ON articles
FOR EACH ROW EXECUTE FUNCTION
tsvector_update_trigger(search_vector, 'pg_catalog.english', title, content);
"""

EXPLAIN ANALYZE: Анализ запросов

from sqlalchemy import text
 
with Session(engine) as session:
    # ========================================
    # Анализ медленного запроса
    # ========================================
 
    # Вариант 1: Raw SQL
    query = text("""
        EXPLAIN ANALYZE
        SELECT * FROM users
        WHERE email LIKE '%@example.com'
        ORDER BY created_at DESC
        LIMIT 100
    """)
 
    result = session.execute(query)
    for row in result:
        print(row)
 
    # Вариант 2: SQLAlchemy ORM
    from sqlalchemy import select
    from sqlalchemy.dialects import postgresql
 
    stmt = (
        select(User)
        .where(User.email.like("%@example.com"))
        .order_by(User.created_at.desc())
        .limit(100)
    )
 
    # Получить SQL
    compiled = stmt.compile(
        dialect=postgresql.dialect(),
        compile_kwargs={"literal_binds": True}
    )
    print(compiled)
 
    # Выполнить с EXPLAIN
    explain_stmt = text(f"EXPLAIN ANALYZE {compiled}")
    result = session.execute(explain_stmt)
    for row in result:
        print(row)

Интерпретация EXPLAIN ANALYZE

-- Пример вывода EXPLAIN ANALYZE
 
Limit  (cost=0.29..856.29 rows=100 width=78) (actual time=0.045..1.234 rows=100 loops=1)
  ->  Index Scan using idx_email on users  (cost=0.29..8562.29 rows=1000 width=78) (actual time=0.044..1.198 rows=100 loops=1)
        Index Cond: (email ~~ '%@example.com'::text)
Planning Time: 0.156 ms
Execution Time: 1.289 ms
 
-- Что смотреть:
-- 1. actual time - реальное время выполнения
-- 2. rows - количество строк
-- 3. Index Scan vs Seq Scan - используется ли индекс
-- 4. cost - относительная стоимость операции

Признаки проблем:

❌ Seq Scan on large_table  - полное сканирование (нужен индекс!)
❌ actual time > 100ms      - медленный запрос
❌ rows=1000000             - слишком много строк, нужна фильтрация
❌ Nested Loop              - возможно N+1 проблема

✅ Index Scan               - используется индекс
✅ Bitmap Heap Scan         - эффективное использование индекса
✅ actual time < 10ms       - быстрый запрос

Создание индексов для медленных запросов

# Проблема: медленный поиск по email
session.query(User).filter(User.email == "test@example.com").first()
# Seq Scan on users (cost=0.00..431.00 rows=1 width=78) (actual time=45.234..45.234 rows=1 loops=1)
 
# Решение: добавить индекс
class User(Base):
    __tablename__ = "users"
    email = Column(String(255), index=True)  # ← Добавили индекс
 
# Или через DDL
with engine.connect() as conn:
    conn.execute(text("CREATE INDEX idx_users_email ON users(email)"))
    conn.commit()
 
# Теперь:
# Index Scan using idx_users_email on users (cost=0.29..8.30 rows=1 width=78) (actual time=0.023..0.023 rows=1 loops=1)
# Ускорение: 45ms → 0.023ms (в 2000 раз!)

Транзакции и конкуррентность

По умолчанию PostgreSQL использует READ COMMITTED. Для критичных операций (банковские транзакции, inventory) используйте SERIALIZABLE.

Основы транзакций

from sqlalchemy.orm import Session
 
# ========================================
# Автоматический commit
# ========================================
 
with Session(engine) as session:
    user = User(email="test@example.com", username="test")
    session.add(user)
    session.commit()  # ← Явный commit
 
# ========================================
# Rollback при ошибке
# ========================================
 
with Session(engine) as session:
    try:
        user = User(email="test@example.com", username="test")
        session.add(user)
 
        # Какая-то ошибка
        raise ValueError("Something went wrong")
 
        session.commit()
    except Exception as e:
        session.rollback()  # ← Откат изменений
        print(f"Error: {e}")
 
# ========================================
# Context manager (автоматический rollback)
# ========================================
 
try:
    with Session(engine) as session:
        with session.begin():  # ← Автоматический commit/rollback
            user = User(email="test@example.com", username="test")
            session.add(user)
            # Если нет ошибок → commit
            # Если ошибка → rollback
except Exception as e:
    print(f"Transaction failed: {e}")

Isolation Levels

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
 
# ========================================
# Установка isolation level
# ========================================
 
# Глобально для engine
engine = create_engine(
    DATABASE_URL,
    isolation_level="SERIALIZABLE"  # READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE
)
 
# Или для конкретной сессии
with Session(engine) as session:
    session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
 
    # Ваши операции
    ...
 
# ========================================
# Уровни изоляции в PostgreSQL
# ========================================
 
# READ COMMITTED (по умолчанию)
# - Видит только committed данные
# - Каждый запрос видит новый snapshot
# - Подходит для большинства случаев
 
# REPEATABLE READ
# - Весь запрос видит snapshot на момент начала транзакции
# - Защищает от non-repeatable reads
# - Может быть serialization failure (нужен retry)
 
# SERIALIZABLE
# - Полная изоляция (как будто транзакции выполняются последовательно)
# - Защищает от всех аномалий
# - Высокая вероятность serialization failures
# - Нужен механизм retry

Пример: банковская транзакция

class Account(Base):
    __tablename__ = "accounts"
 
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    balance = Column(Integer, default=0)  # В копейках
 
def transfer_money(from_account_id: int, to_account_id: int, amount: int):
    """Перевод денег между счетами с правильной изоляцией"""
 
    max_retries = 3
 
    for attempt in range(max_retries):
        try:
            with Session(engine) as session:
                # SERIALIZABLE для критичных операций
                session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
 
                with session.begin():
                    # SELECT FOR UPDATE - блокировка строк
                    from_account = (
                        session.query(Account)
                        .filter(Account.id == from_account_id)
                        .with_for_update()  # ← Эксклюзивная блокировка
                        .first()
                    )
 
                    to_account = (
                        session.query(Account)
                        .filter(Account.id == to_account_id)
                        .with_for_update()
                        .first()
                    )
 
                    # Проверка баланса
                    if from_account.balance < amount:
                        raise ValueError("Insufficient funds")
 
                    # Перевод
                    from_account.balance -= amount
                    to_account.balance += amount
 
                    # Commit произойдет автоматически
                    print(f"Transfer successful: {amount} копеек")
                    return True
 
        except Exception as e:
            if "could not serialize" in str(e).lower():
                # Serialization failure - retry
                print(f"Serialization failure, retry {attempt + 1}/{max_retries}")
                continue
            else:
                # Другая ошибка
                raise
 
    raise Exception("Transfer failed after max retries")

Оптимистичные блокировки (Optimistic Locking)

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Session
 
class Document(Base):
    __tablename__ = "documents"
 
    id = Column(Integer, primary_key=True)
    title = Column(String(255))
    content = Column(Text)
    version = Column(Integer, default=1, nullable=False)  # ← Version field
 
# ========================================
# Использование
# ========================================
 
def update_document(document_id: int, new_content: str):
    with Session(engine) as session:
        # Читаем документ
        doc = session.get(Document, document_id)
        old_version = doc.version
 
        # Обновляем
        doc.content = new_content
        doc.version += 1
 
        # Сохраняем только если версия не изменилась
        from sqlalchemy import update
 
        stmt = (
            update(Document)
            .where(
                Document.id == document_id,
                Document.version == old_version  # ← Проверка версии
            )
            .values(content=new_content, version=old_version + 1)
        )
 
        result = session.execute(stmt)
 
        if result.rowcount == 0:
            # Кто-то другой уже обновил документ
            raise ValueError("Document was modified by another user")
 
        session.commit()

Асинхронная работа с PostgreSQL

asyncpg - самый быстрый PostgreSQL драйвер для Python. В 3-5 раз быстрее psycopg2. Используйте его для high-performance приложений.

asyncpg: Low-level драйвер

import asyncio
import asyncpg
 
# ========================================
# Подключение
# ========================================
 
async def main():
    # Создание connection pool
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="postgres",
        password="postgres",
        database="myapp",
        min_size=10,
        max_size=20,
    )
 
    # ========================================
    # SELECT
    # ========================================
 
    # Один результат
    async with pool.acquire() as conn:
        row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", 1)
        print(row["email"])
 
    # Несколько результатов
    async with pool.acquire() as conn:
        rows = await conn.fetch("SELECT * FROM users WHERE is_active = $1", True)
        for row in rows:
            print(row["username"])
 
    # ========================================
    # INSERT
    # ========================================
 
    async with pool.acquire() as conn:
        user_id = await conn.fetchval(
            """
            INSERT INTO users (email, username, password_hash)
            VALUES ($1, $2, $3)
            RETURNING id
            """,
            "test@example.com",
            "testuser",
            "hashed_password"
        )
        print(f"Created user: {user_id}")
 
    # ========================================
    # Bulk INSERT
    # ========================================
 
    async with pool.acquire() as conn:
        data = [
            ("alice@example.com", "alice", "hash1"),
            ("bob@example.com", "bob", "hash2"),
        ]
 
        await conn.executemany(
            "INSERT INTO users (email, username, password_hash) VALUES ($1, $2, $3)",
            data
        )
 
    # ========================================
    # COPY (самый быстрый способ bulk insert)
    # ========================================
 
    async with pool.acquire() as conn:
        # Подготовка данных
        data = [
            (f"user{i}@example.com", f"user{i}", f"hash{i}")
            for i in range(10000)
        ]
 
        # COPY FROM (в разы быстрее executemany)
        await conn.copy_records_to_table(
            "users",
            records=data,
            columns=["email", "username", "password_hash"]
        )
 
    # ========================================
    # Транзакции
    # ========================================
 
    async with pool.acquire() as conn:
        async with conn.transaction():
            # Все или ничего
            await conn.execute("INSERT INTO users (...) VALUES (...)")
            await conn.execute("INSERT INTO posts (...) VALUES (...)")
            # Автоматический commit или rollback
 
    # Закрытие pool
    await pool.close()
 
# Запуск
asyncio.run(main())

SQLAlchemy 2.0 + asyncio

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, selectinload
from sqlalchemy import select
 
# ========================================
# Настройка async engine
# ========================================
 
# Обратите внимание: postgresql+asyncpg://
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/myapp"
 
async_engine = create_async_engine(
    DATABASE_URL,
    echo=True,
    pool_size=10,
    max_overflow=20,
)
 
# Session maker
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False,
)
 
# ========================================
# Модели (те же, что и для sync)
# ========================================
 
class Base(DeclarativeBase):
    pass
 
class User(Base):
    __tablename__ = "users"
    # ... те же поля
 
class Post(Base):
    __tablename__ = "posts"
    # ... те же поля
 
# ========================================
# CRUD операции (async)
# ========================================
 
async def create_user(email: str, username: str):
    async with AsyncSessionLocal() as session:
        user = User(email=email, username=username, password_hash="hash")
        session.add(user)
        await session.commit()
        await session.refresh(user)
        return user
 
async def get_user(user_id: int):
    async with AsyncSessionLocal() as session:
        user = await session.get(User, user_id)
        return user
 
async def get_users_with_posts():
    async with AsyncSessionLocal() as session:
        # Async version of selectinload
        stmt = select(User).options(selectinload(User.posts))
        result = await session.execute(stmt)
        users = result.scalars().all()
        return users
 
async def update_user(user_id: int, new_username: str):
    async with AsyncSessionLocal() as session:
        user = await session.get(User, user_id)
        user.username = new_username
        await session.commit()
 
async def delete_user(user_id: int):
    async with AsyncSessionLocal() as session:
        user = await session.get(User, user_id)
        await session.delete(user)
        await session.commit()
 
# ========================================
# Использование в FastAPI
# ========================================
 
from fastapi import FastAPI, Depends
from typing import AsyncGenerator
 
app = FastAPI()
 
async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        yield session
 
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    user = await db.get(User, user_id)
    return user
 
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
    stmt = select(User).limit(100)
    result = await db.execute(stmt)
    users = result.scalars().all()
    return users

Миграции с Alembic

Alembic - это Git для вашей базы данных. Каждое изменение схемы - отдельный коммит (миграция), который можно откатить.

Установка и инициализация

# Установка
pip install alembic
 
# Инициализация
alembic init alembic
 
# Структура проекта:
# alembic/
#   versions/  ← Здесь будут миграции
#   env.py     ← Настройки Alembic
# alembic.ini  ← Конфигурация

Настройка alembic

# alembic/env.py
 
from sqlalchemy import engine_from_config, pool
from alembic import context
from myapp.models import Base  # ← Ваши модели
 
# Настройка target_metadata
target_metadata = Base.metadata
 
def run_migrations_online():
    # URL из переменной окружения
    import os
    from sqlalchemy import create_engine
 
    url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/myapp")
 
    connectable = create_engine(url)
 
    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )
 
        with context.begin_transaction():
            context.run_migrations()

Создание миграций

# Автогенерация миграции из изменений моделей
alembic revision --autogenerate -m "Add users table"
 
# Создастся файл: alembic/versions/xxxxx_add_users_table.py
 
# Применить миграцию
alembic upgrade head
 
# Откатить миграцию
alembic downgrade -1
 
# История миграций
alembic history
 
# Текущая версия БД
alembic current

Пример миграции

# alembic/versions/xxxxx_add_users_table.py
 
"""Add users table
 
Revision ID: xxxxx
Revises:
Create Date: 2024-01-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
 
# revision identifiers, used by Alembic.
revision = 'xxxxx'
down_revision = None
branch_labels = None
depends_on = None
 
def upgrade():
    # Создание таблицы
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('email', sa.String(length=255), nullable=False),
        sa.Column('username', sa.String(length=100), nullable=False),
        sa.Column('password_hash', sa.String(length=255), nullable=False),
        sa.Column('is_active', sa.Boolean(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=True),
        sa.PrimaryKeyConstraint('id')
    )
 
    # Создание индекса
    op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
 
def downgrade():
    # Откат изменений
    op.drop_index(op.f('ix_users_email'), table_name='users')
    op.drop_table('users')

Продвинутые возможности PostgreSQL

JSONB: NoSQL внутри SQL

from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import func
 
class Product(Base):
    __tablename__ = "products"
 
    id = Column(Integer, primary_key=True)
    name = Column(String(255))
    attributes = Column(JSONB)  # JSON поле
 
# ========================================
# Вставка JSON данных
# ========================================
 
product = Product(
    name="Laptop",
    attributes={
        "brand": "Apple",
        "specs": {
            "cpu": "M2",
            "ram": 16,
            "storage": 512
        },
        "colors": ["silver", "space gray"]
    }
)
 
# ========================================
# Запросы к JSONB
# ========================================
 
# Поиск по ключу верхнего уровня
products = session.query(Product).filter(
    Product.attributes["brand"].astext == "Apple"
).all()
 
# Поиск по вложенному ключу
products = session.query(Product).filter(
    Product.attributes["specs"]["cpu"].astext == "M2"
).all()
 
# Проверка существования ключа
products = session.query(Product).filter(
    Product.attributes.has_key("warranty")
).all()
 
# Содержит JSON
products = session.query(Product).filter(
    Product.attributes.contains({"brand": "Apple"})
).all()
 
# Поиск в массиве
products = session.query(Product).filter(
    Product.attributes["colors"].contains(["silver"])
).all()

Полнотекстовый поиск

from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy import func
 
class Article(Base):
    __tablename__ = "articles"
 
    id = Column(Integer, primary_key=True)
    title = Column(String(255))
    content = Column(Text)
    search_vector = Column(TSVECTOR)
 
# ========================================
# Полнотекстовый поиск
# ========================================
 
# Создание поискового вектора (в PostgreSQL)
# UPDATE articles SET search_vector =
#   to_tsvector('english', title || ' ' || content);
 
# Поиск
search_query = "python postgresql"
 
results = session.query(Article).filter(
    Article.search_vector.match(search_query)
).all()
 
# Поиск с ранжированием
results = (
    session.query(
        Article,
        func.ts_rank(Article.search_vector, func.to_tsquery(search_query)).label("rank")
    )
    .filter(Article.search_vector.match(search_query))
    .order_by(text("rank DESC"))
    .all()
)

Партиционирование (Partitioning)

-- Создание партиционированной таблицы в PostgreSQL
 
-- Партиционирование по диапазону (дата)
CREATE TABLE events (
    id SERIAL,
    event_type VARCHAR(50),
    data JSONB,
    created_at TIMESTAMP NOT NULL
) PARTITION BY RANGE (created_at);
 
-- Создание партиций
CREATE TABLE events_2024_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
 
CREATE TABLE events_2024_02 PARTITION OF events
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
 
-- Вставка автоматически попадет в нужную партицию
INSERT INTO events (event_type, data, created_at)
VALUES ('click', '{"page": "/home"}', '2024-01-15');
 
-- Запрос автоматически использует нужную партицию
SELECT * FROM events WHERE created_at BETWEEN '2024-01-01' AND '2024-01-31';
# В SQLAlchemy партиционирование делается через raw SQL
# или через DDL events
 
from sqlalchemy import event, DDL
 
# Создание партиций при создании таблицы
event.listen(
    Base.metadata,
    'after_create',
    DDL("""
        CREATE TABLE IF NOT EXISTS events_2024_01 PARTITION OF events
        FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
    """)
)

Production Best Practices

Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
 
engine = create_engine(
    DATABASE_URL,
    # ========================================
    # Connection Pool настройки
    # ========================================
    poolclass=QueuePool,
    pool_size=20,  # Постоянные подключения
    max_overflow=10,  # Дополнительные при пиковой нагрузке
    pool_timeout=30,  # Таймаут ожидания свободного подключения
    pool_recycle=3600,  # Пересоздание подключений каждый час
    pool_pre_ping=True,  # Проверка подключения перед использованием
    echo_pool=True,  # Логирование pool events
)
 
# Итого: 20 постоянных + до 10 временных = максимум 30 подключений

Оптимизация производительности

# ========================================
# 1. Bulk операции вместо циклов
# ========================================
 
# ❌ Медленно
for data in large_dataset:
    user = User(**data)
    session.add(user)
    session.commit()  # 10,000 коммитов!
 
# ✅ Быстро
session.bulk_insert_mappings(User, large_dataset)
session.commit()  # Один коммит
 
# ========================================
# 2. Batch обработка
# ========================================
 
# Обработка по частям для экономии памяти
from sqlalchemy import select
 
stmt = select(User).execution_options(yield_per=1000)
 
with Session(engine) as session:
    for partition in session.execute(stmt).partitions():
        for user in partition:
            # Обработка
            pass
 
# ========================================
# 3. Отключение автоflush для bulk операций
# ========================================
 
with Session(engine) as session:
    session.autoflush = False
 
    for i in range(10000):
        user = User(email=f"user{i}@example.com", username=f"user{i}")
        session.add(user)
 
        if i % 1000 == 0:
            session.flush()  # Ручной flush каждые 1000 записей
 
    session.commit()
 
# ========================================
# 4. Read-only запросы
# ========================================
 
# Пометить запрос как read-only (оптимизация PostgreSQL)
stmt = select(User).execution_options(postgresql_readonly=True)

Мониторинг и логирование

import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
 
# ========================================
# Логирование медленных запросов
# ========================================
 
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
 
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total_time = time.time() - conn.info['query_start_time'].pop()
 
    if total_time > 0.1:  # Логировать запросы дольше 100ms
        logger = logging.getLogger("slow_queries")
        logger.warning(
            f"Slow query ({total_time:.3f}s): {statement[:200]}"
        )

Заключение

PostgreSQL + Python - мощная комбинация для любого проекта: от стартапа до enterprise. Знание SQLAlchemy, индексов и асинхронности сделает вас незаменимым разработчиком.

Ключевые выводы

  1. SQLAlchemy ORM vs Core - ORM для простоты, Core для производительности
  2. N+1 проблема - главный враг производительности в ORM, решается через eager loading
  3. Индексы критичны - разница между 0.5ms и 5 секундами
  4. EXPLAIN ANALYZE - ваш лучший друг для оптимизации
  5. Транзакции и изоляция - правильный isolation level спасает от багов
  6. asyncpg - самый быстрый драйвер для high-performance приложений
  7. Миграции обязательны - Alembic делает изменения схемы безопасными
  8. Connection pooling - критично для production нагрузки

Чек-лист для production

  • Настроен connection pool (pool_size, max_overflow)
  • Включен pool_pre_ping для обнаружения мертвых подключений
  • Созданы индексы для всех частых WHERE/JOIN условий
  • Проверены медленные запросы через EXPLAIN ANALYZE
  • Настроены миграции через Alembic
  • Eager loading для предотвращения N+1
  • Мониторинг медленных запросов (logging)
  • Backup стратегия для данных
  • Правильный isolation level для критичных транзакций
  • Использование asyncpg/async SQLAlchemy для высоконагруженных сервисов

Следующие шаги

  1. Практика - создайте реальное приложение с PostgreSQL
  2. Оптимизация - найдите медленные запросы через EXPLAIN ANALYZE
  3. Масштабирование - изучите репликацию и шардирование
  4. Мониторинг - настройте pg_stat_statements и pgBadger
  5. Продвинутые фичи - PostGIS для геоданных, pg_trgm для fuzzy search

Готовы к следующему уровню? Прочитайте мою статью «PostgreSQL для бэкенд-разработчиков: 7 паттернов, которые я использую каждый день» — глубокий разбор JSONB, partial indexes, generated columns, row-level security, advisory locks, materialized views, partitioning и продвинутого мониторинга. С реальными кейсами из production и граблями, на которые я наступил за 8 лет работы.

Полезные ресурсы

Официальная документация:

Инструменты:

  • pgAdmin - GUI для PostgreSQL
  • DBeaver - универсальный DB клиент
  • pgcli - улучшенный psql с автодополнением
  • pg_stat_statements - анализ запросов

Книги:

  • «PostgreSQL: Up and Running» - Regina Obe, Leo Hsu
  • «High Performance PostgreSQL» - Ibrar Ahmed

Помните: База данных - это фундамент приложения. Инвестиции в изучение PostgreSQL и правильные паттерны работы с ним окупятся многократно через производительность, надежность и масштабируемость вашего проекта.