PostgreSQL для Python-разработчиков
Полное практическое руководство по работе с PostgreSQL в Python - от SQLAlchemy до оптимизации запросов и асинхронной работы с asyncpg
Оглавление
PostgreSQL для Python-разработчиков
PostgreSQL - это не просто база данных, это швейцарский нож для хранения данных. JSON, полнотекстовый поиск, GIS, массивы - всё из коробки.
Как пользоваться материалом
- Спешишь? Читай введение, SQLAlchemy Core/ORM и раздел про N+1 проблему.
- Оптимизируешь производительность? Изучи индексы, EXPLAIN ANALYZE и партиционирование.
- Пишешь асинхронное приложение? Переходи сразу к asyncpg и асинхронному SQLAlchemy.
- Работаешь с транзакциями? Смотри раздел про isolation levels и конкуррентность.
- Мигрируешь с другой БД? Начни с установки и основ PostgreSQL.
Глоссарий
ORM (Object-Relational Mapping) - Подход, при котором таблицы БД представляются как Python-классы, а строки как объекты.
SQLAlchemy Core - Низкоуровневый SQL-конструктор SQLAlchemy без ORM магии.
SQLAlchemy ORM - Высокоуровневый слой SQLAlchemy с моделями, сессиями и автоматическими JOIN.
Session - Контекст работы с БД в SQLAlchemy ORM. Хранит изменения до commit/rollback.
Query - Объект SQLAlchemy для построения SELECT запросов.
N+1 Problem - Антипаттерн, когда вместо одного запроса с JOIN делается N+1 запросов (1 для основной таблицы + N для связанных).
Eager Loading - Загрузка связанных объектов сразу (через JOIN), предотвращает N+1.
Lazy Loading - Отложенная загрузка связанных объектов (по требованию). Может вызвать N+1.
Transaction - Группа операций БД, выполняющихся атомарно (всё или ничего).
ACID - Atomicity, Consistency, Isolation, Durability. Гарантии транзакций.
Isolation Level - Уровень изоляции транзакций (READ COMMITTED, REPEATABLE READ, SERIALIZABLE).
Index - Структура данных для ускорения поиска. Аналог индекса в книге.
B-tree Index - Стандартный индекс PostgreSQL для сравнений (=, <, >, BETWEEN).
GIN Index - Generalized Inverted Index для массивов, JSON, полнотекстового поиска.
Partial Index - Индекс только для части таблицы (WHERE условие).
EXPLAIN ANALYZE - Команда для анализа плана выполнения запроса и реального времени.
Connection Pool - Пул переиспользуемых подключений к БД для снижения overhead.
Migration - Версионирование схемы БД. Скрипты для изменения структуры таблиц.
Alembic - Инструмент миграций для SQLAlchemy.
asyncpg - Асинхронный драйвер PostgreSQL для Python (самый быстрый).
psycopg2 - Синхронный драйвер PostgreSQL (стандарт индустрии).
Введение: Почему PostgreSQL?
PostgreSQL vs. MySQL vs. SQLite
┌─────────────────┬──────────────┬──────────────┬──────────────┐
│ Характеристика│ PostgreSQL │ MySQL │ SQLite │
├─────────────────┼──────────────┼──────────────┼──────────────┤
│ Соответствие SQL│ Отлично │ Хорошо │ Хорошо │
│ JSON поддержка │ Нативная │ Базовая │ Нет │
│ Полнотекст поиск│ Встроенный │ Встроенный │ FTS5 │
│ Window Functions│ Да │ Да (5.7+) │ Да (3.25+) │
│ Массивы │ Да │ Нет │ Нет │
│ JSONB │ Да │ Нет │ Нет │
│ Расширения │ Огромное │ Мало │ Нет │
│ Конкуррентность │ MVCC │ Locks │ Locks │
│ Use case │ Всё │ Web apps │ Embedded │
└─────────────────┴──────────────┴──────────────┴──────────────┘
Когда выбирать PostgreSQL:
- ✅ Сложные запросы и аналитика
- ✅ JSON/JSONB данные (NoSQL внутри SQL)
- ✅ Геоданные (PostGIS расширение)
- ✅ Полнотекстовый поиск
- ✅ Высокая конкуррентность (MVCC)
- ✅ Соответствие стандартам SQL
Когда можно выбрать MySQL:
- Простые CRUD операции
- Репликация из коробки проще
- Compatibility с legacy системами
Когда выбрать SQLite:
- Embedded приложения
- Прототипы и тестирование
- Мобильные приложения
- Не нужна конкуррентность
Установка и настройка
Установка PostgreSQL
macOS (Homebrew):
brew install postgresql@16
brew services start postgresql@16
# Проверка
psql --versionUbuntu/Debian:
sudo apt update
sudo apt install postgresql postgresql-contrib
# Запуск
sudo systemctl start postgresql
sudo systemctl enable postgresql
# Вход под пользователем postgres
sudo -u postgres psqlDocker (рекомендуется для разработки):
docker run -d \
--name postgres-dev \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_USER=postgres \
-e POSTGRES_DB=myapp \
-p 5432:5432 \
-v postgres_data:/var/lib/postgresql/data \
postgres:16-alpine# docker-compose.yml
version: "3.8"
services:
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: myapp
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:Установка Python библиотек
# Синхронные драйверы
pip install psycopg2-binary # Или psycopg2 (требует компиляцию)
# SQLAlchemy
pip install sqlalchemy
# Асинхронные драйверы
pip install asyncpg
pip install sqlalchemy[asyncio] # SQLAlchemy 2.0 с asyncio
# Миграции
pip install alembic
# Все вместе для веб-приложения
pip install \
sqlalchemy[asyncio] \
asyncpg \
psycopg2-binary \
alembicБазовая настройка PostgreSQL
# Вход в psql
psql -U postgres
# Создание БД
CREATE DATABASE myapp;
# Создание пользователя
CREATE USER myuser WITH PASSWORD 'mypassword';
# Права доступа
GRANT ALL PRIVILEGES ON DATABASE myapp TO myuser;
# Вход под новым пользователем
\q
psql -U myuser -d myappSQLAlchemy Core: SQL без ORM
SQLAlchemy Core - это мощный SQL конструктор без магии ORM. Используйте его для сложных запросов, bulk operations и максимального контроля.
Подключение к БД
from sqlalchemy import create_engine, text
# ========================================
# Синхронное подключение
# ========================================
# Формат: postgresql://user:password@host:port/database
DATABASE_URL = "postgresql://postgres:postgres@localhost:5432/myapp"
# Создание engine
engine = create_engine(
DATABASE_URL,
echo=True, # Логировать все SQL запросы
pool_size=10, # Размер connection pool
max_overflow=20, # Дополнительные подключения при нагрузке
pool_pre_ping=True, # Проверка подключения перед использованием
)
# Выполнение запроса
with engine.connect() as conn:
result = conn.execute(text("SELECT version()"))
print(result.fetchone())Определение таблиц
from sqlalchemy import (
MetaData,
Table,
Column,
Integer,
String,
Text,
DateTime,
Boolean,
ForeignKey,
Index,
)
from datetime import datetime
metadata = MetaData()
# ========================================
# Таблица пользователей
# ========================================
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("email", String(255), unique=True, nullable=False, index=True),
Column("username", String(100), unique=True, nullable=False),
Column("password_hash", String(255), nullable=False),
Column("is_active", Boolean, default=True),
Column("created_at", DateTime, default=datetime.utcnow),
Column("updated_at", DateTime, default=datetime.utcnow, onupdate=datetime.utcnow),
)
# ========================================
# Таблица постов
# ========================================
posts = Table(
"posts",
metadata,
Column("id", Integer, primary_key=True),
Column("title", String(255), nullable=False),
Column("content", Text, nullable=False),
Column("user_id", Integer, ForeignKey("users.id", ondelete="CASCADE")),
Column("published", Boolean, default=False),
Column("created_at", DateTime, default=datetime.utcnow),
# Композитный индекс
Index("idx_user_published", "user_id", "published"),
)
# Создание таблиц
metadata.create_all(engine)CRUD операции (Core)
from sqlalchemy import select, insert, update, delete
# ========================================
# CREATE
# ========================================
# Вставка одной записи
stmt = insert(users).values(
email="user@example.com",
username="john_doe",
password_hash="hashed_password",
)
with engine.connect() as conn:
result = conn.execute(stmt)
conn.commit()
print(f"Inserted ID: {result.inserted_primary_key[0]}")
# Bulk insert
stmt = insert(users)
data = [
{"email": "alice@example.com", "username": "alice", "password_hash": "hash1"},
{"email": "bob@example.com", "username": "bob", "password_hash": "hash2"},
]
with engine.connect() as conn:
conn.execute(stmt, data)
conn.commit()
# ========================================
# READ
# ========================================
# Выборка всех пользователей
stmt = select(users)
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(row)
# Выборка с WHERE
stmt = select(users).where(users.c.is_active == True)
# Множественные условия
stmt = select(users).where(
users.c.is_active == True,
users.c.email.like("%@example.com"),
)
# ORDER BY
stmt = select(users).order_by(users.c.created_at.desc())
# LIMIT и OFFSET
stmt = select(users).limit(10).offset(20)
# Выборка конкретных колонок
stmt = select(users.c.id, users.c.email)
# ========================================
# JOIN
# ========================================
# INNER JOIN
stmt = (
select(users.c.username, posts.c.title)
.select_from(users.join(posts, users.c.id == posts.c.user_id))
.where(posts.c.published == True)
)
# LEFT JOIN
stmt = select(users.c.username, posts.c.title).select_from(
users.outerjoin(posts, users.c.id == posts.c.user_id)
)
# ========================================
# UPDATE
# ========================================
# Обновление одной записи
stmt = (
update(users)
.where(users.c.id == 1)
.values(username="new_username", updated_at=datetime.utcnow())
)
with engine.connect() as conn:
result = conn.execute(stmt)
conn.commit()
print(f"Updated {result.rowcount} rows")
# ========================================
# DELETE
# ========================================
# Удаление
stmt = delete(users).where(users.c.is_active == False)
with engine.connect() as conn:
result = conn.execute(stmt)
conn.commit()
print(f"Deleted {result.rowcount} rows")Агрегация и группировка
from sqlalchemy import func, and_, or_
# COUNT
stmt = select(func.count(users.c.id))
# GROUP BY
stmt = (
select(posts.c.user_id, func.count(posts.c.id).label("post_count"))
.group_by(posts.c.user_id)
)
# HAVING
stmt = (
select(posts.c.user_id, func.count(posts.c.id).label("post_count"))
.group_by(posts.c.user_id)
.having(func.count(posts.c.id) > 5)
)
# Сложные агрегации
stmt = select(
func.count(users.c.id).label("total_users"),
func.count(users.c.id).filter(users.c.is_active == True).label("active_users"),
func.max(users.c.created_at).label("newest_user"),
).select_from(users)SQLAlchemy ORM: Объектный подход
ORM делает код чище и безопаснее, но добавляет абстракцию. Для простых CRUD операций ORM отлично, для сложной аналитики - используйте Core или raw SQL.
Определение моделей
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationship, Session
from datetime import datetime
# ========================================
# SQLAlchemy 2.0 стиль
# ========================================
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True, nullable=False, index=True)
username = Column(String(100), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Связь с постами
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(id={self.id}, username={self.username})>"
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Обратная связь с пользователем
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(id={self.id}, title={self.title})>"
# Создание таблиц
DATABASE_URL = "postgresql://postgres:postgres@localhost:5432/myapp"
engine = create_engine(DATABASE_URL, echo=True)
Base.metadata.create_all(engine)CRUD операции (ORM)
from sqlalchemy.orm import Session
# ========================================
# CREATE
# ========================================
with Session(engine) as session:
# Создание пользователя
user = User(
email="alice@example.com",
username="alice",
password_hash="hashed_password",
)
session.add(user)
session.commit()
session.refresh(user) # Обновить объект из БД (получить ID)
print(f"Created user: {user.id}")
# Bulk insert
users = [
User(email="bob@example.com", username="bob", password_hash="hash1"),
User(email="charlie@example.com", username="charlie", password_hash="hash2"),
]
session.add_all(users)
session.commit()
# ========================================
# READ
# ========================================
with Session(engine) as session:
# Получение по ID
user = session.get(User, 1)
print(user)
# Получение первого совпадения
user = session.query(User).filter(User.email == "alice@example.com").first()
# Получение всех
users = session.query(User).all()
# Фильтрация
active_users = session.query(User).filter(User.is_active == True).all()
# Множественные фильтры
users = (
session.query(User)
.filter(User.is_active == True)
.filter(User.email.like("%@example.com"))
.all()
)
# Сортировка
users = session.query(User).order_by(User.created_at.desc()).all()
# Пагинация
users = session.query(User).limit(10).offset(20).all()
# Подсчет
count = session.query(User).count()
# ========================================
# UPDATE
# ========================================
with Session(engine) as session:
user = session.get(User, 1)
user.username = "new_username"
user.updated_at = datetime.utcnow()
session.commit()
# Bulk update
session.query(User).filter(User.is_active == False).update(
{"is_active": True}, synchronize_session=False
)
session.commit()
# ========================================
# DELETE
# ========================================
with Session(engine) as session:
user = session.get(User, 1)
session.delete(user)
session.commit()
# Bulk delete
session.query(User).filter(User.is_active == False).delete()
session.commit()Relationships и JOIN
# ========================================
# Доступ к связанным объектам
# ========================================
with Session(engine) as session:
# Получить пользователя и его посты
user = session.get(User, 1)
# ❌ N+1 проблема! (lazy loading)
for post in user.posts: # Каждая итерация = новый SQL запрос
print(post.title)
# ✅ Eager loading с joinedload
from sqlalchemy.orm import joinedload
user = (
session.query(User)
.options(joinedload(User.posts))
.filter(User.id == 1)
.first()
)
# Теперь посты уже загружены одним запросом
for post in user.posts: # Без дополнительных запросов
print(post.title)
# ========================================
# Типы eager loading
# ========================================
from sqlalchemy.orm import joinedload, selectinload, subqueryload
with Session(engine) as session:
# joinedload - LEFT OUTER JOIN (один запрос)
users = session.query(User).options(joinedload(User.posts)).all()
# selectinload - два запроса (более эффективно для many-to-many)
users = session.query(User).options(selectinload(User.posts)).all()
# subqueryload - два запроса с подзапросом
users = session.query(User).options(subqueryload(User.posts)).all()
# ========================================
# Ручной JOIN
# ========================================
with Session(engine) as session:
results = (
session.query(User, Post)
.join(Post, User.id == Post.user_id)
.filter(Post.published == True)
.all()
)
for user, post in results:
print(f"{user.username}: {post.title}")N+1 Problem: Главная ловушка ORM
N+1 проблема - это когда вместо одного запроса с JOIN делается N+1 запросов. Это убийца производительности #1 в ORM приложениях.
Пример N+1 проблемы
# ❌ N+1 ПРОБЛЕМА
with Session(engine) as session:
# 1 запрос: получить всех пользователей
users = session.query(User).all() # SELECT * FROM users
# N запросов: для каждого пользователя получить посты
for user in users: # Если 100 пользователей = 100 запросов!
print(f"{user.username} has {len(user.posts)} posts")
# SELECT * FROM posts WHERE user_id = 1
# SELECT * FROM posts WHERE user_id = 2
# SELECT * FROM posts WHERE user_id = 3
# ... 100 раз
# Итого: 101 запрос вместо 1!Решение: Eager Loading
from sqlalchemy.orm import joinedload, selectinload
# ✅ РЕШЕНИЕ 1: joinedload (LEFT OUTER JOIN)
with Session(engine) as session:
users = session.query(User).options(joinedload(User.posts)).all()
# SELECT users.*, posts.*
# FROM users
# LEFT OUTER JOIN posts ON users.id = posts.user_id
for user in users:
print(f"{user.username} has {len(user.posts)} posts") # Без доп. запросов!
# Итого: 1 запрос
# ✅ РЕШЕНИЕ 2: selectinload (два запроса, но эффективнее для больших данных)
with Session(engine) as session:
users = session.query(User).options(selectinload(User.posts)).all()
# Запрос 1: SELECT * FROM users
# Запрос 2: SELECT * FROM posts WHERE user_id IN (1, 2, 3, ...)
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
# Итого: 2 запроса вместо 101!Вложенные relationship
# Модель с тремя уровнями вложенности
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True)
text = Column(Text)
post_id = Column(Integer, ForeignKey("posts.id"))
user_id = Column(Integer, ForeignKey("users.id"))
post = relationship("Post", back_populates="comments")
author = relationship("User")
Post.comments = relationship("Comment", back_populates="post")
# ❌ N+1+N проблема
users = session.query(User).all()
for user in users:
for post in user.posts:
for comment in post.comments:
print(comment.text)
# ✅ Вложенный eager loading
users = (
session.query(User)
.options(
selectinload(User.posts).selectinload(Post.comments)
)
.all()
)
# Или через joinedload для всего дерева
users = (
session.query(User)
.options(
joinedload(User.posts).joinedload(Post.comments)
)
.all()
)Автоматическое обнаружение N+1
# Включите логирование SQL
import logging
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# Теперь вы увидите все SQL запросы в консоли
# Если видите десятки одинаковых SELECT - это N+1!Индексы: Ускорение запросов
Правильные индексы - это разница между запросом за 0.5ms и запросом за 5 секунд. Изучите EXPLAIN ANALYZE - это ваш лучший друг.
Типы индексов в PostgreSQL
from sqlalchemy import Index, text
# ========================================
# 1. B-tree индекс (по умолчанию)
# ========================================
# Используется для: =, <, <=, >, >=, BETWEEN, IN, IS NULL
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), index=True) # Простой индекс
username = Column(String(100))
# Композитный индекс
__table_args__ = (
Index("idx_email_username", "email", "username"),
)
# ========================================
# 2. Partial Index (условный индекс)
# ========================================
# Индекс только для части таблицы
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
published = Column(Boolean)
created_at = Column(DateTime)
# Индекс только для опубликованных постов
__table_args__ = (
Index(
"idx_published_posts",
"created_at",
postgresql_where=text("published = true"),
),
)
# ========================================
# 3. GIN индекс для JSONB и массивов
# ========================================
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
data = Column(JSONB) # JSONB поле
tags = Column(ARRAY(String)) # Массив
__table_args__ = (
Index("idx_product_data", "data", postgresql_using="gin"),
Index("idx_product_tags", "tags", postgresql_using="gin"),
)
# Теперь можно быстро искать по JSON и массивам:
# SELECT * FROM products WHERE data @> '{"color": "red"}'
# SELECT * FROM products WHERE tags @> ARRAY['electronics']
# ========================================
# 4. Full-text search индекс
# ========================================
from sqlalchemy.dialects.postgresql import TSVECTOR
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
title = Column(String(255))
content = Column(Text)
search_vector = Column(TSVECTOR) # Полнотекстовый индекс
__table_args__ = (
Index("idx_search_vector", "search_vector", postgresql_using="gin"),
)
# Создание trigger для автообновления search_vector в PostgreSQL:
"""
CREATE TRIGGER articles_search_update
BEFORE INSERT OR UPDATE ON articles
FOR EACH ROW EXECUTE FUNCTION
tsvector_update_trigger(search_vector, 'pg_catalog.english', title, content);
"""EXPLAIN ANALYZE: Анализ запросов
from sqlalchemy import text
with Session(engine) as session:
# ========================================
# Анализ медленного запроса
# ========================================
# Вариант 1: Raw SQL
query = text("""
EXPLAIN ANALYZE
SELECT * FROM users
WHERE email LIKE '%@example.com'
ORDER BY created_at DESC
LIMIT 100
""")
result = session.execute(query)
for row in result:
print(row)
# Вариант 2: SQLAlchemy ORM
from sqlalchemy import select
from sqlalchemy.dialects import postgresql
stmt = (
select(User)
.where(User.email.like("%@example.com"))
.order_by(User.created_at.desc())
.limit(100)
)
# Получить SQL
compiled = stmt.compile(
dialect=postgresql.dialect(),
compile_kwargs={"literal_binds": True}
)
print(compiled)
# Выполнить с EXPLAIN
explain_stmt = text(f"EXPLAIN ANALYZE {compiled}")
result = session.execute(explain_stmt)
for row in result:
print(row)Интерпретация EXPLAIN ANALYZE
-- Пример вывода EXPLAIN ANALYZE
Limit (cost=0.29..856.29 rows=100 width=78) (actual time=0.045..1.234 rows=100 loops=1)
-> Index Scan using idx_email on users (cost=0.29..8562.29 rows=1000 width=78) (actual time=0.044..1.198 rows=100 loops=1)
Index Cond: (email ~~ '%@example.com'::text)
Planning Time: 0.156 ms
Execution Time: 1.289 ms
-- Что смотреть:
-- 1. actual time - реальное время выполнения
-- 2. rows - количество строк
-- 3. Index Scan vs Seq Scan - используется ли индекс
-- 4. cost - относительная стоимость операцииПризнаки проблем:
❌ Seq Scan on large_table - полное сканирование (нужен индекс!)
❌ actual time > 100ms - медленный запрос
❌ rows=1000000 - слишком много строк, нужна фильтрация
❌ Nested Loop - возможно N+1 проблема
✅ Index Scan - используется индекс
✅ Bitmap Heap Scan - эффективное использование индекса
✅ actual time < 10ms - быстрый запрос
Создание индексов для медленных запросов
# Проблема: медленный поиск по email
session.query(User).filter(User.email == "test@example.com").first()
# Seq Scan on users (cost=0.00..431.00 rows=1 width=78) (actual time=45.234..45.234 rows=1 loops=1)
# Решение: добавить индекс
class User(Base):
__tablename__ = "users"
email = Column(String(255), index=True) # ← Добавили индекс
# Или через DDL
with engine.connect() as conn:
conn.execute(text("CREATE INDEX idx_users_email ON users(email)"))
conn.commit()
# Теперь:
# Index Scan using idx_users_email on users (cost=0.29..8.30 rows=1 width=78) (actual time=0.023..0.023 rows=1 loops=1)
# Ускорение: 45ms → 0.023ms (в 2000 раз!)Транзакции и конкуррентность
По умолчанию PostgreSQL использует READ COMMITTED. Для критичных операций (банковские транзакции, inventory) используйте SERIALIZABLE.
Основы транзакций
from sqlalchemy.orm import Session
# ========================================
# Автоматический commit
# ========================================
with Session(engine) as session:
user = User(email="test@example.com", username="test")
session.add(user)
session.commit() # ← Явный commit
# ========================================
# Rollback при ошибке
# ========================================
with Session(engine) as session:
try:
user = User(email="test@example.com", username="test")
session.add(user)
# Какая-то ошибка
raise ValueError("Something went wrong")
session.commit()
except Exception as e:
session.rollback() # ← Откат изменений
print(f"Error: {e}")
# ========================================
# Context manager (автоматический rollback)
# ========================================
try:
with Session(engine) as session:
with session.begin(): # ← Автоматический commit/rollback
user = User(email="test@example.com", username="test")
session.add(user)
# Если нет ошибок → commit
# Если ошибка → rollback
except Exception as e:
print(f"Transaction failed: {e}")Isolation Levels
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
# ========================================
# Установка isolation level
# ========================================
# Глобально для engine
engine = create_engine(
DATABASE_URL,
isolation_level="SERIALIZABLE" # READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE
)
# Или для конкретной сессии
with Session(engine) as session:
session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# Ваши операции
...
# ========================================
# Уровни изоляции в PostgreSQL
# ========================================
# READ COMMITTED (по умолчанию)
# - Видит только committed данные
# - Каждый запрос видит новый snapshot
# - Подходит для большинства случаев
# REPEATABLE READ
# - Весь запрос видит snapshot на момент начала транзакции
# - Защищает от non-repeatable reads
# - Может быть serialization failure (нужен retry)
# SERIALIZABLE
# - Полная изоляция (как будто транзакции выполняются последовательно)
# - Защищает от всех аномалий
# - Высокая вероятность serialization failures
# - Нужен механизм retryПример: банковская транзакция
class Account(Base):
__tablename__ = "accounts"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"))
balance = Column(Integer, default=0) # В копейках
def transfer_money(from_account_id: int, to_account_id: int, amount: int):
"""Перевод денег между счетами с правильной изоляцией"""
max_retries = 3
for attempt in range(max_retries):
try:
with Session(engine) as session:
# SERIALIZABLE для критичных операций
session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
with session.begin():
# SELECT FOR UPDATE - блокировка строк
from_account = (
session.query(Account)
.filter(Account.id == from_account_id)
.with_for_update() # ← Эксклюзивная блокировка
.first()
)
to_account = (
session.query(Account)
.filter(Account.id == to_account_id)
.with_for_update()
.first()
)
# Проверка баланса
if from_account.balance < amount:
raise ValueError("Insufficient funds")
# Перевод
from_account.balance -= amount
to_account.balance += amount
# Commit произойдет автоматически
print(f"Transfer successful: {amount} копеек")
return True
except Exception as e:
if "could not serialize" in str(e).lower():
# Serialization failure - retry
print(f"Serialization failure, retry {attempt + 1}/{max_retries}")
continue
else:
# Другая ошибка
raise
raise Exception("Transfer failed after max retries")Оптимистичные блокировки (Optimistic Locking)
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Session
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True)
title = Column(String(255))
content = Column(Text)
version = Column(Integer, default=1, nullable=False) # ← Version field
# ========================================
# Использование
# ========================================
def update_document(document_id: int, new_content: str):
with Session(engine) as session:
# Читаем документ
doc = session.get(Document, document_id)
old_version = doc.version
# Обновляем
doc.content = new_content
doc.version += 1
# Сохраняем только если версия не изменилась
from sqlalchemy import update
stmt = (
update(Document)
.where(
Document.id == document_id,
Document.version == old_version # ← Проверка версии
)
.values(content=new_content, version=old_version + 1)
)
result = session.execute(stmt)
if result.rowcount == 0:
# Кто-то другой уже обновил документ
raise ValueError("Document was modified by another user")
session.commit()Асинхронная работа с PostgreSQL
asyncpg - самый быстрый PostgreSQL драйвер для Python. В 3-5 раз быстрее psycopg2. Используйте его для high-performance приложений.
asyncpg: Low-level драйвер
import asyncio
import asyncpg
# ========================================
# Подключение
# ========================================
async def main():
# Создание connection pool
pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="postgres",
database="myapp",
min_size=10,
max_size=20,
)
# ========================================
# SELECT
# ========================================
# Один результат
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", 1)
print(row["email"])
# Несколько результатов
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users WHERE is_active = $1", True)
for row in rows:
print(row["username"])
# ========================================
# INSERT
# ========================================
async with pool.acquire() as conn:
user_id = await conn.fetchval(
"""
INSERT INTO users (email, username, password_hash)
VALUES ($1, $2, $3)
RETURNING id
""",
"test@example.com",
"testuser",
"hashed_password"
)
print(f"Created user: {user_id}")
# ========================================
# Bulk INSERT
# ========================================
async with pool.acquire() as conn:
data = [
("alice@example.com", "alice", "hash1"),
("bob@example.com", "bob", "hash2"),
]
await conn.executemany(
"INSERT INTO users (email, username, password_hash) VALUES ($1, $2, $3)",
data
)
# ========================================
# COPY (самый быстрый способ bulk insert)
# ========================================
async with pool.acquire() as conn:
# Подготовка данных
data = [
(f"user{i}@example.com", f"user{i}", f"hash{i}")
for i in range(10000)
]
# COPY FROM (в разы быстрее executemany)
await conn.copy_records_to_table(
"users",
records=data,
columns=["email", "username", "password_hash"]
)
# ========================================
# Транзакции
# ========================================
async with pool.acquire() as conn:
async with conn.transaction():
# Все или ничего
await conn.execute("INSERT INTO users (...) VALUES (...)")
await conn.execute("INSERT INTO posts (...) VALUES (...)")
# Автоматический commit или rollback
# Закрытие pool
await pool.close()
# Запуск
asyncio.run(main())SQLAlchemy 2.0 + asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, selectinload
from sqlalchemy import select
# ========================================
# Настройка async engine
# ========================================
# Обратите внимание: postgresql+asyncpg://
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/myapp"
async_engine = create_async_engine(
DATABASE_URL,
echo=True,
pool_size=10,
max_overflow=20,
)
# Session maker
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
# ========================================
# Модели (те же, что и для sync)
# ========================================
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
# ... те же поля
class Post(Base):
__tablename__ = "posts"
# ... те же поля
# ========================================
# CRUD операции (async)
# ========================================
async def create_user(email: str, username: str):
async with AsyncSessionLocal() as session:
user = User(email=email, username=username, password_hash="hash")
session.add(user)
await session.commit()
await session.refresh(user)
return user
async def get_user(user_id: int):
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
return user
async def get_users_with_posts():
async with AsyncSessionLocal() as session:
# Async version of selectinload
stmt = select(User).options(selectinload(User.posts))
result = await session.execute(stmt)
users = result.scalars().all()
return users
async def update_user(user_id: int, new_username: str):
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
user.username = new_username
await session.commit()
async def delete_user(user_id: int):
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
await session.delete(user)
await session.commit()
# ========================================
# Использование в FastAPI
# ========================================
from fastapi import FastAPI, Depends
from typing import AsyncGenerator
app = FastAPI()
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await db.get(User, user_id)
return user
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
stmt = select(User).limit(100)
result = await db.execute(stmt)
users = result.scalars().all()
return usersМиграции с Alembic
Alembic - это Git для вашей базы данных. Каждое изменение схемы - отдельный коммит (миграция), который можно откатить.
Установка и инициализация
# Установка
pip install alembic
# Инициализация
alembic init alembic
# Структура проекта:
# alembic/
# versions/ ← Здесь будут миграции
# env.py ← Настройки Alembic
# alembic.ini ← КонфигурацияНастройка alembic
# alembic/env.py
from sqlalchemy import engine_from_config, pool
from alembic import context
from myapp.models import Base # ← Ваши модели
# Настройка target_metadata
target_metadata = Base.metadata
def run_migrations_online():
# URL из переменной окружения
import os
from sqlalchemy import create_engine
url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/myapp")
connectable = create_engine(url)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()Создание миграций
# Автогенерация миграции из изменений моделей
alembic revision --autogenerate -m "Add users table"
# Создастся файл: alembic/versions/xxxxx_add_users_table.py
# Применить миграцию
alembic upgrade head
# Откатить миграцию
alembic downgrade -1
# История миграций
alembic history
# Текущая версия БД
alembic currentПример миграции
# alembic/versions/xxxxx_add_users_table.py
"""Add users table
Revision ID: xxxxx
Revises:
Create Date: 2024-01-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'xxxxx'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Создание таблицы
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(length=255), nullable=False),
sa.Column('username', sa.String(length=100), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# Создание индекса
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
def downgrade():
# Откат изменений
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')Продвинутые возможности PostgreSQL
JSONB: NoSQL внутри SQL
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import func
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
name = Column(String(255))
attributes = Column(JSONB) # JSON поле
# ========================================
# Вставка JSON данных
# ========================================
product = Product(
name="Laptop",
attributes={
"brand": "Apple",
"specs": {
"cpu": "M2",
"ram": 16,
"storage": 512
},
"colors": ["silver", "space gray"]
}
)
# ========================================
# Запросы к JSONB
# ========================================
# Поиск по ключу верхнего уровня
products = session.query(Product).filter(
Product.attributes["brand"].astext == "Apple"
).all()
# Поиск по вложенному ключу
products = session.query(Product).filter(
Product.attributes["specs"]["cpu"].astext == "M2"
).all()
# Проверка существования ключа
products = session.query(Product).filter(
Product.attributes.has_key("warranty")
).all()
# Содержит JSON
products = session.query(Product).filter(
Product.attributes.contains({"brand": "Apple"})
).all()
# Поиск в массиве
products = session.query(Product).filter(
Product.attributes["colors"].contains(["silver"])
).all()Полнотекстовый поиск
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy import func
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
title = Column(String(255))
content = Column(Text)
search_vector = Column(TSVECTOR)
# ========================================
# Полнотекстовый поиск
# ========================================
# Создание поискового вектора (в PostgreSQL)
# UPDATE articles SET search_vector =
# to_tsvector('english', title || ' ' || content);
# Поиск
search_query = "python postgresql"
results = session.query(Article).filter(
Article.search_vector.match(search_query)
).all()
# Поиск с ранжированием
results = (
session.query(
Article,
func.ts_rank(Article.search_vector, func.to_tsquery(search_query)).label("rank")
)
.filter(Article.search_vector.match(search_query))
.order_by(text("rank DESC"))
.all()
)Партиционирование (Partitioning)
-- Создание партиционированной таблицы в PostgreSQL
-- Партиционирование по диапазону (дата)
CREATE TABLE events (
id SERIAL,
event_type VARCHAR(50),
data JSONB,
created_at TIMESTAMP NOT NULL
) PARTITION BY RANGE (created_at);
-- Создание партиций
CREATE TABLE events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Вставка автоматически попадет в нужную партицию
INSERT INTO events (event_type, data, created_at)
VALUES ('click', '{"page": "/home"}', '2024-01-15');
-- Запрос автоматически использует нужную партицию
SELECT * FROM events WHERE created_at BETWEEN '2024-01-01' AND '2024-01-31';# В SQLAlchemy партиционирование делается через raw SQL
# или через DDL events
from sqlalchemy import event, DDL
# Создание партиций при создании таблицы
event.listen(
Base.metadata,
'after_create',
DDL("""
CREATE TABLE IF NOT EXISTS events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
""")
)Production Best Practices
Connection Pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
# ========================================
# Connection Pool настройки
# ========================================
poolclass=QueuePool,
pool_size=20, # Постоянные подключения
max_overflow=10, # Дополнительные при пиковой нагрузке
pool_timeout=30, # Таймаут ожидания свободного подключения
pool_recycle=3600, # Пересоздание подключений каждый час
pool_pre_ping=True, # Проверка подключения перед использованием
echo_pool=True, # Логирование pool events
)
# Итого: 20 постоянных + до 10 временных = максимум 30 подключенийОптимизация производительности
# ========================================
# 1. Bulk операции вместо циклов
# ========================================
# ❌ Медленно
for data in large_dataset:
user = User(**data)
session.add(user)
session.commit() # 10,000 коммитов!
# ✅ Быстро
session.bulk_insert_mappings(User, large_dataset)
session.commit() # Один коммит
# ========================================
# 2. Batch обработка
# ========================================
# Обработка по частям для экономии памяти
from sqlalchemy import select
stmt = select(User).execution_options(yield_per=1000)
with Session(engine) as session:
for partition in session.execute(stmt).partitions():
for user in partition:
# Обработка
pass
# ========================================
# 3. Отключение автоflush для bulk операций
# ========================================
with Session(engine) as session:
session.autoflush = False
for i in range(10000):
user = User(email=f"user{i}@example.com", username=f"user{i}")
session.add(user)
if i % 1000 == 0:
session.flush() # Ручной flush каждые 1000 записей
session.commit()
# ========================================
# 4. Read-only запросы
# ========================================
# Пометить запрос как read-only (оптимизация PostgreSQL)
stmt = select(User).execution_options(postgresql_readonly=True)Мониторинг и логирование
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
# ========================================
# Логирование медленных запросов
# ========================================
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total_time = time.time() - conn.info['query_start_time'].pop()
if total_time > 0.1: # Логировать запросы дольше 100ms
logger = logging.getLogger("slow_queries")
logger.warning(
f"Slow query ({total_time:.3f}s): {statement[:200]}"
)Заключение
PostgreSQL + Python - мощная комбинация для любого проекта: от стартапа до enterprise. Знание SQLAlchemy, индексов и асинхронности сделает вас незаменимым разработчиком.
Ключевые выводы
- SQLAlchemy ORM vs Core - ORM для простоты, Core для производительности
- N+1 проблема - главный враг производительности в ORM, решается через eager loading
- Индексы критичны - разница между 0.5ms и 5 секундами
- EXPLAIN ANALYZE - ваш лучший друг для оптимизации
- Транзакции и изоляция - правильный isolation level спасает от багов
- asyncpg - самый быстрый драйвер для high-performance приложений
- Миграции обязательны - Alembic делает изменения схемы безопасными
- Connection pooling - критично для production нагрузки
Чек-лист для production
- Настроен connection pool (pool_size, max_overflow)
- Включен pool_pre_ping для обнаружения мертвых подключений
- Созданы индексы для всех частых WHERE/JOIN условий
- Проверены медленные запросы через EXPLAIN ANALYZE
- Настроены миграции через Alembic
- Eager loading для предотвращения N+1
- Мониторинг медленных запросов (logging)
- Backup стратегия для данных
- Правильный isolation level для критичных транзакций
- Использование asyncpg/async SQLAlchemy для высоконагруженных сервисов
Следующие шаги
- Практика - создайте реальное приложение с PostgreSQL
- Оптимизация - найдите медленные запросы через EXPLAIN ANALYZE
- Масштабирование - изучите репликацию и шардирование
- Мониторинг - настройте pg_stat_statements и pgBadger
- Продвинутые фичи - PostGIS для геоданных, pg_trgm для fuzzy search
Готовы к следующему уровню? Прочитайте мою статью «PostgreSQL для бэкенд-разработчиков: 7 паттернов, которые я использую каждый день» — глубокий разбор JSONB, partial indexes, generated columns, row-level security, advisory locks, materialized views, partitioning и продвинутого мониторинга. С реальными кейсами из production и граблями, на которые я наступил за 8 лет работы.
Полезные ресурсы
Официальная документация:
Инструменты:
- pgAdmin - GUI для PostgreSQL
- DBeaver - универсальный DB клиент
- pgcli - улучшенный psql с автодополнением
- pg_stat_statements - анализ запросов
Книги:
- «PostgreSQL: Up and Running» - Regina Obe, Leo Hsu
- «High Performance PostgreSQL» - Ibrar Ahmed
Помните: База данных - это фундамент приложения. Инвестиции в изучение PostgreSQL и правильные паттерны работы с ним окупятся многократно через производительность, надежность и масштабируемость вашего проекта.