Перейти к содержимому

pandas для бэкенд-разработчика: логи, Excel/1С, ETL и отчёты — почему я не использовал его раньше

Константин Потапов
30 мин

Полное руководство по использованию pandas в бэкенд-разработке: анализ логов, работа с Excel/1С, ETL-пайплайны, валидация данных и генерация отчётов. Практические кейсы с кодом и реальной историей из практики.

pandas для бэкенд-разработчика: логи, Excel/1С, ETL и отчёты — почему я не использовал его раньше

Раньше я анализировал логи с помощью grep, awk и костыльных Python-скриптов. Потом попробовал pandas — и понял, что терял часы там, где можно было потратить минуты.

Многие Python-разработчики считают pandas инструментом исключительно для data science. Но на практике pandas решает десятки задач в бэкенд-разработке: от анализа логов до генерации отчётов. Разберём, зачем знать pandas каждому Python-разработчику и как применять его в реальных проектах.

Для кого: Python-разработчики (Junior/Middle), которые хотят эффективно работать с данными в бэкенд-приложениях.

Результат:

  • 📊 Поймёте, где pandas эффективнее SQL/ORM
  • 🔍 Научитесь анализировать логи и метрики
  • 📁 Автоматизируете анализ данных из Excel/1С
  • ⚙️ Создадите ETL-пайплайн для обработки данных
  • ✅ Настроите валидацию и очистку данных
  • 📄 Сгенерируете отчёты для бизнеса с маржинальностью

Хотите сразу к практике?

Почему бэкенд-разработчику нужен pandas?

Типичная ситуация

# Задача: найти топ-10 эндпоинтов по времени ответа за последнюю неделю
# Вариант 1: Чистый Python (15+ строк с циклами, dict comprehensions)
# Вариант 2: SQL запрос + ORM (нужно писать миграцию для analytics таблицы)
# Вариант 3: pandas (3 строки кода)
 
import pandas as pd
 
df = pd.read_csv('api_logs.csv')
top_slow = df.nlargest(10, 'response_time')[['endpoint', 'response_time']]
print(top_slow)

pandas — это инструмент для анализа и трансформации данных, когда:

  • Нужно быстро проанализировать данные без написания SQL-запросов
  • Данные приходят из разных источников (CSV, JSON, Excel, БД)
  • Требуется сложная группировка, агрегация или преобразование
  • Надо сгенерировать отчёт для бизнеса

Важно: pandas не заменяет SQL/ORM для операций с БД в продакшене. Это инструмент для анализа, ETL и генерации отчётов.

Реальный кейс: как я научился ценить простые решения

История из практики

Консультировал владельца сети магазинов по развертыванию его аналитической системы. Честно признаюсь — приехал с предубеждениями и ушёл с важным уроком.

Что я ожидал увидеть:

  • Очередную "костыльную" разработку
  • Непродуманную архитектуру
  • Код, который "нужно срочно переписать"

Что я увидел на самом деле:

  • Windows-сервер с Django
  • Excel-файлы с выгрузками из 1С
  • Скрипты на Python с pandas для анализа продаж, остатков, маржинальности
  • Простая визуализация результатов

Моя первая реакция (ошибочная):

"Зачем такой подход, когда есть Power BI, Tableau, modern stack с PostgreSQL?"

Чему я научился у бизнесмена

1. Ценность скорости над совершенством

Его система давала ответы за 5 минут. "Правильные" BI-системы требовали недель на настройку дашбордов. Пока конкуренты ждали отчётов, он уже корректировал закупки.

2. Гибкость важнее архитектуры

Изменить логику расчёта маржинальности — 20 минут в Jupyter Notebook. В корпоративной BI — тикет в отдел аналитики, неделя ожидания.

3. Результат важнее кода

Код не был идеальным. Но он отвечал на вопрос: "Какие товары убыточны и что с ними делать?". Каждый день. Без сбоев.

4. Знание инструментов решает задачи

Владелец не был программистом, но освоил pandas настолько, чтобы быстро анализировать данные. Это дало ему конкурентное преимущество, которое не купишь за деньги.

Что я понял

Ошибка разработчика: думать о технологиях, а не о задаче. Подход бизнесмена: решить задачу максимально быстро и эффективно.

pandas + Excel — не "костыль", а осознанный выбор инструмента под задачу:

  • Нет зависимости от аналитиков
  • Быстрая итерация гипотез
  • Низкий порог входа для коллег
  • Результат здесь и сейчас

Вывод для разработчиков

Если вы хотите создавать реальную ценность — научитесь решать бизнес-задачи, а не просто писать "правильный код".

pandas в этом — ваш союзник:

  • Быстрый анализ данных без настройки инфраструктуры
  • Проверка гипотез за минуты, а не дни
  • Понятный код, который может поддерживать не только вы

Главный урок: Простое решение, которое работает, лучше идеального решения, которое никогда не будет готово.

Вывод: pandas — это не просто библиотека для data science. Это инструмент для принятия бизнес-решений, и его знание делает вас более ценным специалистом.

Когда pandas НЕ нужен

Прежде чем погружаться в примеры, важно понять границы применимости pandas. Это сэкономит вам время и избавит от проблем в продакшене.

❌ НЕ используйте pandas для:

1. Транзакционных операций в продакшене

# ❌ ПЛОХО: запись в БД через pandas
df.to_sql('users', engine, if_exists='append')
# Проблемы: медленно, нет поддержки транзакций, сложная обработка ошибок
 
# ✅ ХОРОШО: используйте ORM или bulk insert
session.bulk_insert_mappings(User, records)

2. Работы с данными > 1GB в памяти

# ❌ ПЛОХО: загрузка 10GB файла
df = pd.read_csv('huge_file.csv')  # OOM (Out of Memory)
 
# ✅ ХОРОШО: используйте chunked reading, Dask или DuckDB
for chunk in pd.read_csv('huge_file.csv', chunksize=10000):
    process_chunk(chunk)

3. Real-time обработки данных

# ❌ ПЛОХО: pandas в FastAPI endpoint для каждого запроса
@app.get("/stats")
def get_stats():
    df = pd.read_sql("SELECT * FROM orders", engine)  # Медленно!
    return df.groupby('product').sum().to_dict()
 
# ✅ ХОРОШО: предрассчитайте агрегаты или используйте SQL/Redis

4. Данных, требующих ACID гарантий

# ❌ ПЛОХО: изменение данных через pandas с сохранением в БД
df = pd.read_sql("SELECT * FROM accounts", engine)
df.loc[df['id'] == 123, 'balance'] -= 100  # Нет транзакции!
df.to_sql('accounts', engine, if_exists='replace')  # Потеря данных при ошибке, нет аудита
 
# ✅ ХОРОШО: используйте SQL транзакции с записью операции
with session.begin():
    # Блокируем счёт для обновления
    account = session.query(Account).filter_by(id=123).with_for_update().first()
    account.balance -= 100
 
    # Записываем транзакцию для аудита
    transaction = Transaction(
        account_id=123,
        amount=-100,
        type='withdrawal',
        timestamp=datetime.now()
    )
    session.add(transaction)

5. Простых операций с БД, которые решаются SQL

# ❌ ПЛОХО: загружаем всё в pandas для простой фильтрации
df = pd.read_sql("SELECT * FROM users", engine)
active_users = df[df['is_active'] == True]
 
# ✅ ХОРОШО: используйте SQL
df = pd.read_sql("SELECT * FROM users WHERE is_active = TRUE", engine)

✅ Используйте pandas для:

  1. Ad-hoc анализа данных — быстрые одноразовые исследования ("ad-hoc" = по мере необходимости, разовые задачи без автоматизации)
    • Пример: "Найди топ-10 продуктов за вчера" — написал 5 строк в Jupyter, получил ответ
  2. ETL пайплайнов — загрузка из разных источников, трансформация, экспорт
  3. Генерации отчётов — Excel, PDF, графики для бизнеса
  4. Валидации данных — проверка CSV перед импортом в БД
  5. Прототипирования — быстрая проверка гипотез на данных

Золотое правило: Если данные помещаются в память (< 50% RAM) и не требуют ACID — pandas подходит. Если данные > 1GB или нужны транзакции — используйте SQL/Dask/Spark.

Основы pandas: DataFrame и Series

DataFrame — таблица данных

DataFrame — это двумерная таблица с типизированными колонками. Аналог spreadsheet (Excel) или SQL-таблицы.

import pandas as pd
 
# Создание DataFrame из dict
data = {
    'user_id': [1, 2, 3, 4],
    'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
    'age': [25, 30, 35, 28],
    'city': ['Moscow', 'SPb', 'Moscow', 'Kazan']
}
 
df = pd.DataFrame(data)
print(df)
 
#    user_id     name  age     city
# 0        1    Alice   25   Moscow
# 1        2      Bob   30      SPb
# 2        3  Charlie   35   Moscow
# 3        4    Diana   28    Kazan

Series — одна колонка (вектор данных)

Series — это одномерный массив с индексом. Каждая колонка DataFrame — это Series.

# Получение колонки
ages = df['age']
print(type(ages))  # <class 'pandas.core.series.Series'>
 
# Операции над Series
print(ages.mean())     # 29.5 (среднее)
print(ages.max())      # 35
print(ages.std())      # 4.2 (стандартное отклонение)

Базовые операции

# Фильтрация
moscow_users = df[df['city'] == 'Moscow']
 
# Сортировка
sorted_by_age = df.sort_values('age', ascending=False)
 
# Выбор колонок
names_and_ages = df[['name', 'age']]
 
# Добавление колонки
df['is_adult'] = df['age'] >= 18
 
# Группировка и агрегация
city_stats = df.groupby('city')['age'].agg(['mean', 'count'])

Частая ошибка: Путать .loc[] (доступ по label) и .iloc[] (доступ по позиции). Используйте .loc[] для фильтрации по условиям, .iloc[] для доступа по индексу.

Use-case 1: Анализ логов и метрик

Задача: найти проблемные эндпоинты

У вас есть access.log с миллионом строк. Нужно найти:

  • Топ-10 медленных эндпоинтов
  • Эндпоинты с высоким процентом 5xx ошибок
  • Распределение времени ответа по часам

Структура лога:

2025-12-12 10:15:23 | GET /api/users | 200 | 45ms
2025-12-12 10:15:24 | POST /api/orders | 201 | 120ms
2025-12-12 10:15:25 | GET /api/products | 500 | 2300ms

Решение с pandas

import pandas as pd
from datetime import datetime
 
# 1. Чтение логов (предположим, они в CSV)
df = pd.read_csv(
    'access.log',
    sep='|',
    names=['timestamp', 'method_path', 'status', 'response_time'],
    parse_dates=['timestamp']  # Автоматический парсинг дат
)
 
# 2. Очистка данных (убираем пробелы!)
df['response_time'] = df['response_time'].str.strip().str.replace('ms', '').astype(int)
df[['method', 'endpoint']] = df['method_path'].str.strip().str.split(' ', expand=True)
df['status'] = df['status'].astype(int)
# .str.strip() критичен — логи часто содержат пробелы вокруг разделителей
 
# 3. Топ-10 медленных эндпоинтов
top_slow = (
    df.groupby('endpoint')['response_time']
    .agg(['mean', 'max', 'count'])
    .sort_values('mean', ascending=False)
    .head(10)
)
 
print("Топ-10 медленных эндпоинтов:")
print(top_slow)
 
# 4. Эндпоинты с высоким процентом 5xx ошибок (векторизовано!)
error_rate = (
    df.assign(is_5xx=df['status'] >= 500)
    .groupby('endpoint')['is_5xx']
    .mean()
    .mul(100)
    .sort_values(ascending=False)
    .head(10)
)
 
print("\nЭндпоинты с высоким % ошибок:")
print(error_rate)
 
# 5. Распределение времени ответа по часам
df['hour'] = df['timestamp'].dt.hour
hourly_stats = df.groupby('hour')['response_time'].agg(['mean', 'median', 'count'])
 
print("\nРаспределение по часам:")
print(hourly_stats)

Визуализация (опционально)

import matplotlib.pyplot as plt
 
# График среднего времени ответа по часам
hourly_stats['mean'].plot(kind='line', title='Среднее время ответа по часам')
plt.xlabel('Час дня')
plt.ylabel('Среднее время (ms)')
plt.savefig('response_time_by_hour.png')

Результат: Вместо написания сложных SQL-запросов или циклов на Python, вы получили полный анализ в 20 строках кода.

Use-case 2: Анализ данных из Excel/1С

Задача: анализ продаж из выгрузки 1С

Типичная ситуация в российских компаниях: бухгалтерия работает в 1С, данные выгружаются в Excel, а бизнес хочет понять:

  • Какие товары продаются лучше всего?
  • Какая маржинальность по категориям?
  • Где теряем деньги?

Исходные данные: Excel-файл sales.xlsx с листами "Продажи", "Остатки", "Себестоимость"

import pandas as pd
 
# 1. Загрузка данных из Excel (разные листы)
sales = pd.read_excel('sales.xlsx', sheet_name='Продажи')
stock = pd.read_excel('sales.xlsx', sheet_name='Остатки')
costs = pd.read_excel('sales.xlsx', sheet_name='Себестоимость')
 
# 2. Очистка названий колонок (1С любит пробелы и кириллицу)
sales.columns = sales.columns.str.strip()
stock.columns = stock.columns.str.strip()
costs.columns = costs.columns.str.strip()
 
# 3. Объединение данных
# Добавляем себестоимость к продажам
df = sales.merge(
    costs[['Артикул', 'Себестоимость']],
    on='Артикул',
    how='left'
)
 
# 4. Расчёт маржинальности (с защитой от деления на 0)
df['Маржа'] = df['Цена продажи'] - df['Себестоимость']
df['Маржинальность %'] = (
    df['Маржа'] / df['Цена продажи'].replace(0, float('nan')) * 100
).round(2)
 
# 5. Анализ: топ товаров по маржинальности
top_margin = (
    df.groupby(['Категория', 'Название товара'])
    .agg({
        'Количество': 'sum',
        'Маржа': 'sum',
        'Маржинальность %': 'mean'
    })
    .sort_values('Маржа', ascending=False)
    .head(20)
)
 
print("Топ-20 товаров по марже:")
print(top_margin)
 
# 6. Анализ: убыточные товары
unprofitable = df[df['Маржа'] < 0].groupby('Категория').agg({
    'Название товара': 'count',
    'Маржа': 'sum'
})
 
print("\nУбыточные товары по категориям:")
print(unprofitable)
 
# 7. Сводная таблица для руководства (требуется колонка "Дата" в исходных данных!)
pivot = df.pivot_table(
    values=['Количество', 'Маржа'],
    index='Категория',
    columns=pd.to_datetime(df['Дата']).dt.month,
    aggfunc='sum',
    fill_value=0
)
 
print("\nПродажи и маржа по месяцам:")
print(pivot)
 
# 8. Экспорт результатов в новый Excel
with pd.ExcelWriter('analysis_results.xlsx') as writer:
    top_margin.to_excel(writer, sheet_name='Топ товары')
    unprofitable.to_excel(writer, sheet_name='Убыточные')
    pivot.to_excel(writer, sheet_name='Динамика по месяцам')
 
print("\n✅ Анализ завершён: analysis_results.xlsx")

Добавляем автоматизацию (production-ready версия)

Часто такой анализ нужно делать регулярно. Создадим функцию с обработкой ошибок и логированием:

import logging
from pathlib import Path
from datetime import datetime
from typing import Optional
import pandas as pd
 
# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
 
class SalesAnalysisError(Exception):
    """Ошибка при анализе продаж."""
    pass
 
def analyze_1c_sales(
    file_path: str,
    output_dir: str = 'reports',
    required_sheets: Optional[list] = None
) -> str:
    """
    Автоматический анализ продаж из выгрузки 1С с обработкой ошибок.
 
    Args:
        file_path: Путь к Excel-файлу с выгрузкой из 1С
        output_dir: Директория для сохранения отчёта
        required_sheets: Список обязательных листов (по умолчанию: Продажи, Себестоимость)
 
    Returns:
        Путь к созданному отчёту
 
    Raises:
        SalesAnalysisError: При ошибках валидации или обработки данных
    """
    if required_sheets is None:
        required_sheets = ['Продажи', 'Себестоимость']
 
    try:
        logger.info(f"Начинаем анализ файла: {file_path}")
 
        # 1. Проверка существования файла
        if not Path(file_path).exists():
            raise SalesAnalysisError(f"Файл не найден: {file_path}")
 
        # 2. Создаём директорию для отчётов
        Path(output_dir).mkdir(parents=True, exist_ok=True)
 
        # 3. Загрузка данных с валидацией
        try:
            excel_file = pd.ExcelFile(file_path)
            available_sheets = excel_file.sheet_names
 
            # Проверяем наличие обязательных листов
            missing_sheets = set(required_sheets) - set(available_sheets)
            if missing_sheets:
                raise SalesAnalysisError(
                    f"Отсутствуют обязательные листы: {missing_sheets}. "
                    f"Доступные листы: {available_sheets}"
                )
 
            sales = pd.read_excel(excel_file, sheet_name='Продажи')
            costs = pd.read_excel(excel_file, sheet_name='Себестоимость')
 
        except ValueError as e:
            raise SalesAnalysisError(f"Ошибка чтения Excel-файла: {e}")
 
        # 4. Валидация структуры данных
        required_sales_cols = ['Артикул', 'Количество', 'Цена продажи', 'Категория', 'Название товара']
        required_costs_cols = ['Артикул', 'Себестоимость']
 
        missing_sales_cols = set(required_sales_cols) - set(sales.columns)
        missing_costs_cols = set(required_costs_cols) - set(costs.columns)
 
        if missing_sales_cols:
            raise SalesAnalysisError(f"В листе 'Продажи' отсутствуют колонки: {missing_sales_cols}")
        if missing_costs_cols:
            raise SalesAnalysisError(f"В листе 'Себестоимость' отсутствуют колонки: {missing_costs_cols}")
 
        logger.info(f"Загружено {len(sales)} записей о продажах, {len(costs)} записей о себестоимости")
 
        # 5. Очистка данных
        sales.columns = sales.columns.str.strip()
        costs.columns = costs.columns.str.strip()
 
        # Удаляем пустые строки
        sales = sales.dropna(subset=['Артикул'])
        costs = costs.dropna(subset=['Артикул'])
 
        # 6. Объединение и расчёт маржи
        df = sales.merge(
            costs[['Артикул', 'Себестоимость']],
            on='Артикул',
            how='left'
        )
 
        # Проверяем наличие товаров без себестоимости
        missing_costs = df['Себестоимость'].isna().sum()
        if missing_costs > 0:
            logger.warning(f"Найдено {missing_costs} товаров без себестоимости")
            df['Себестоимость'] = df['Себестоимость'].fillna(0)
 
        df['Маржа'] = df['Цена продажи'] - df['Себестоимость']
        # Защита от деления на 0
        df['Маржинальность %'] = (
            df['Маржа'] / df['Цена продажи'].replace(0, float('nan')) * 100
        ).round(2)
 
        # 7. Анализ
        top_margin = df.groupby(['Категория', 'Название товара']).agg({
            'Количество': 'sum',
            'Маржа': 'sum',
            'Маржинальность %': 'mean'
        }).sort_values('Маржа', ascending=False).head(20)
 
        unprofitable = df[df['Маржа'] < 0].groupby('Категория').agg({
            'Название товара': 'count',
            'Маржа': 'sum'
        })
 
        # 8. Экспорт отчёта
        timestamp = datetime.now().strftime('%Y%m%d_%H%M')
        output_file = Path(output_dir) / f"sales_analysis_{timestamp}.xlsx"
 
        with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
            top_margin.to_excel(writer, sheet_name='Топ товары')
 
            if not unprofitable.empty:
                unprofitable.to_excel(writer, sheet_name='Убыточные')
            else:
                logger.info("Убыточных товаров не найдено")
 
            # Сводная статистика
            summary = pd.DataFrame({
                'Метрика': [
                    'Всего продаж',
                    'Общая маржа',
                    'Средняя маржинальность',
                    'Убыточных товаров'
                ],
                'Значение': [
                    f"{df['Количество'].sum():,.0f} шт",
                    f"{df['Маржа'].sum():,.2f} ₽",
                    f"{df['Маржинальность %'].mean():.2f}%",
                    len(df[df['Маржа'] < 0])
                ]
            })
            summary.to_excel(writer, sheet_name='Сводка', index=False)
 
        logger.info(f"✅ Отчёт успешно создан: {output_file}")
        return str(output_file)
 
    except SalesAnalysisError:
        raise
    except Exception as e:
        logger.error(f"Неожиданная ошибка при анализе: {e}", exc_info=True)
        raise SalesAnalysisError(f"Ошибка обработки данных: {e}")
 
# Использование с обработкой ошибок
try:
    report = analyze_1c_sales('sales.xlsx')
    print(f"✅ Отчёт готов: {report}")
except SalesAnalysisError as e:
    print(f"❌ Ошибка: {e}")
    # Отправка уведомления администратору
except Exception as e:
    print(f"❌ Критическая ошибка: {e}")
    # Логирование в Sentry/другую систему мониторинга

Production-ready код: Теперь функция проверяет входные данные, обрабатывает ошибки, логирует прогресс и корректно сообщает о проблемах. Такой код можно использовать в автоматизированных процессах.

Практический совет: Такие скрипты можно завернуть в простой веб-интерфейс (Flask/FastAPI) — директор загружает Excel, получает отчёт. За 2 часа работы вы создадите инструмент, который сэкономит часы аналитики каждую неделю.

Use-case 3: ETL и data pipelines

Задача: синхронизация данных между системами

Часто нужно:

  • Загрузить данные из одной БД
  • Трансформировать их (переименовать колонки, изменить формат)
  • Обогатить данными из внешнего API
  • Сохранить в другую БД или отправить в другой сервис

Пример: экспорт пользователей из PostgreSQL в Excel для маркетинга

import pandas as pd
from sqlalchemy import create_engine
 
# 1. Подключение к БД
engine = create_engine('postgresql://user:password@localhost:5432/mydb')
 
# 2. Загрузка данных
query = """
    SELECT
        u.id,
        u.email,
        u.created_at,
        u.city,
        COUNT(o.id) as orders_count,
        SUM(o.total) as total_spent
    FROM users u
    LEFT JOIN orders o ON u.id = o.user_id
    WHERE u.created_at >= '2025-01-01'
    GROUP BY u.id, u.email, u.created_at, u.city
"""
 
df = pd.read_sql(query, engine)
 
# 3. Трансформация данных
df['created_at'] = pd.to_datetime(df['created_at'])
df['registration_month'] = df['created_at'].dt.to_period('M')
 
# Категоризация пользователей (векторизованный подход)
import numpy as np
 
# ✅ Используем np.select вместо apply() — в 10-100 раз быстрее!
conditions = [
    df['total_spent'] > 10000,      # Условие для VIP
    df['orders_count'] > 5,          # Условие для Active
    df['orders_count'] > 0           # Условие для Buyer
]
choices = ['VIP', 'Active', 'Buyer']
df['category'] = np.select(conditions, choices, default='Registered')
 
# Почему это быстрее?
# apply() вызывает Python-функцию для КАЖДОЙ строки (построчно)
# np.select() выполняет операцию сразу над всем массивом в C/NumPy
# На 100k строк: apply() ~2 сек, np.select() ~20 мс
 
# 4. Добавление данных из внешнего источника (например, город -> регион)
city_to_region = {
    'Moscow': 'Central',
    'SPb': 'Northwest',
    'Kazan': 'Volga'
}
df['region'] = df['city'].map(city_to_region)
 
# 5. Экспорт в Excel с форматированием
with pd.ExcelWriter('users_report.xlsx', engine='openpyxl') as writer:
    # Основной отчёт
    df.to_excel(writer, sheet_name='Users', index=False)
 
    # Сводная таблица по категориям
    summary = df.groupby('category').agg({
        'id': 'count',
        'total_spent': 'sum'
    }).rename(columns={'id': 'users_count'})
 
    summary.to_excel(writer, sheet_name='Summary')

Pipeline для ежедневной обработки

import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
 
def daily_etl_pipeline(date: datetime):
    """
    Ежедневный ETL: загружает данные за день, обрабатывает, сохраняет.
    """
    # 1. Загрузка данных
    raw_data_path = f'raw_data/{date.strftime("%Y-%m-%d")}.csv'
    df = pd.read_csv(raw_data_path)
 
    # 2. Валидация
    required_columns = ['user_id', 'event_type', 'timestamp']
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns: {missing}")
 
    # 3. Очистка
    df = df.dropna(subset=['user_id'])  # Удаляем строки без user_id
    df = df.drop_duplicates()           # Удаляем дубликаты
 
    # 4. Преобразование типов
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['user_id'] = df['user_id'].astype(int)
 
    # 5. Фильтрация аномалий
    df = df[df['timestamp'].dt.date == date.date()]  # Только данные за нужный день
 
    # 6. Агрегация
    daily_stats = df.groupby('user_id').agg({
        'event_type': 'count',
        'timestamp': ['min', 'max']
    }).reset_index()
 
    # 7. Сохранение результата
    output_path = f'processed_data/{date.strftime("%Y-%m-%d")}_stats.parquet'
    daily_stats.to_parquet(output_path, compression='snappy')
 
    return daily_stats
 
# Запуск пайплайна
yesterday = datetime.now() - timedelta(days=1)
stats = daily_etl_pipeline(yesterday)
print(f"Processed {len(stats)} users")

Совет: Используйте Parquet вместо CSV для хранения промежуточных результатов — он в 5-10 раз быстрее и занимает меньше места.

Use-case 4: Валидация и очистка данных

Задача: проверить CSV перед импортом в БД

Пользователь загружает CSV-файл с клиентами. Нужно:

  • Проверить наличие обязательных полей
  • Валидировать email и телефоны
  • Удалить дубликаты
  • Привести данные к единому формату
import pandas as pd
import re
 
def validate_and_clean_csv(file_path: str) -> pd.DataFrame:
    """
    Валидация и очистка CSV с клиентами.
    """
 
    # 1. Загрузка
    df = pd.read_csv(file_path)
 
    # 2. Проверка обязательных колонок
    required = ['name', 'email', 'phone']
    missing = set(required) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
 
    # 3. Удаление пустых строк
    df = df.dropna(subset=required)
 
    # 4. Валидация email (векторизовано!)
    email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    df['email_valid'] = df['email'].str.match(email_pattern, na=False)
 
    invalid_emails = df[~df['email_valid']]
    if len(invalid_emails) > 0:
        print(f"⚠️ Found {len(invalid_emails)} invalid emails:")
        print(invalid_emails[['name', 'email']].head())
 
    df = df[df['email_valid']].drop('email_valid', axis=1)
 
    # 5. Очистка телефонов (приведение к единому формату)
    def clean_phone(phone):
        # Удаляем все кроме цифр
        digits = re.sub(r'\D', '', str(phone))
        # Добавляем +7 если начинается с 8 или 9
        if digits.startswith('8') and len(digits) == 11:
            digits = '7' + digits[1:]
        elif digits.startswith('9') and len(digits) == 10:
            digits = '7' + digits
        return f"+{digits}"
 
    df['phone'] = df['phone'].apply(clean_phone)
 
    # 6. Удаление дубликатов по email
    duplicates = df[df.duplicated(subset=['email'], keep=False)]
    if len(duplicates) > 0:
        print(f"⚠️ Found {len(duplicates)} duplicate emails")
        print(duplicates[['name', 'email']])
 
    df = df.drop_duplicates(subset=['email'], keep='first')
 
    # 7. Приведение имени к title case
    df['name'] = df['name'].str.strip().str.title()
 
    return df
 
# Использование
try:
    clean_df = validate_and_clean_csv('clients.csv')
    clean_df.to_csv('clients_clean.csv', index=False)
    print(f"✅ Successfully cleaned {len(clean_df)} records")
except Exception as e:
    print(f"❌ Validation failed: {e}")

Продвинутая валидация с отчётом

def create_validation_report(df: pd.DataFrame) -> dict:
    """
    Создаёт детальный отчёт о качестве данных.
    """
 
    report = {
        'total_rows': len(df),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicates': {
            'email': df['email'].duplicated().sum(),
            'phone': df['phone'].duplicated().sum()
        },
        'data_types': df.dtypes.astype(str).to_dict(),
        'unique_values': {
            col: df[col].nunique()
            for col in df.columns
        }
    }
 
    # Статистика для числовых колонок
    numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
    if len(numeric_cols) > 0:
        report['numeric_stats'] = df[numeric_cols].describe().to_dict()
 
    # Топ-10 самых частых значений для категориальных колонок
    categorical_cols = df.select_dtypes(include=['object']).columns
    report['top_values'] = {
        col: df[col].value_counts().head(10).to_dict()
        for col in categorical_cols
    }
 
    return report
 
# Генерация отчёта
report = create_validation_report(df)
print(f"Total rows: {report['total_rows']}")
print(f"Missing values: {report['missing_values']}")
print(f"Email duplicates: {report['duplicates']['email']}")

Use-case 5: Генерация отчётов

Задача: ежемесячный отчёт для бизнеса в Excel

Бизнес хочет получать Excel-файл с несколькими листами:

  • Общая статистика за месяц
  • Топ-10 клиентов
  • Динамика продаж по дням
  • Разбивка по городам
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
 
def generate_monthly_report(year: int, month: int):
    """
    Генерирует ежемесячный отчёт для бизнеса.
    """
 
    engine = create_engine('postgresql://user:password@localhost:5432/mydb')
 
    # 1. Загрузка данных за месяц (безопасный параметризованный запрос)
    from datetime import datetime
    from dateutil.relativedelta import relativedelta
 
    # Создаём границы периода
    start_date = datetime(year, month, 1)
    end_date = start_date + relativedelta(months=1)
 
    # ✅ Параметризованный запрос (защита от SQL-инъекции)
    query = """
        SELECT
            o.id as order_id,
            o.created_at,
            o.total,
            o.status,
            u.id as user_id,
            u.email,
            u.city
        FROM orders o
        JOIN users u ON o.user_id = u.id
        WHERE o.created_at >= :start_date AND o.created_at < :end_date
    """
 
    df = pd.read_sql(query, engine, params={'start_date': start_date, 'end_date': end_date})
    df['created_at'] = pd.to_datetime(df['created_at'])
    df['date'] = df['created_at'].dt.date
 
    # 2. Создание Excel с множеством листов
    output_file = f'monthly_report_{year}_{month:02d}.xlsx'
 
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        # Лист 1: Общая статистика
        summary = pd.DataFrame({
            'Метрика': [
                'Всего заказов',
                'Сумма продаж',
                'Средний чек',
                'Уникальных клиентов',
                'Завершённых заказов',
                'Отменённых заказов'
            ],
            'Значение': [
                len(df),
                f"{df['total'].sum():,.2f} ₽",
                f"{df['total'].mean():,.2f} ₽",
                df['user_id'].nunique(),
                len(df[df['status'] == 'completed']),
                len(df[df['status'] == 'cancelled'])
            ]
        })
        summary.to_excel(writer, sheet_name='Общая статистика', index=False)
 
        # Лист 2: Топ-10 клиентов
        top_clients = (
            df.groupby(['user_id', 'email'])
            .agg({
                'order_id': 'count',
                'total': 'sum'
            })
            .rename(columns={
                'order_id': 'Количество заказов',
                'total': 'Сумма покупок'
            })
            .sort_values('Сумма покупок', ascending=False)
            .head(10)
            .reset_index()
        )
        top_clients.to_excel(writer, sheet_name='Топ-10 клиентов', index=False)
 
        # Лист 3: Динамика по дням
        daily_sales = (
            df.groupby('date')
            .agg({
                'order_id': 'count',
                'total': 'sum'
            })
            .rename(columns={
                'order_id': 'Заказов',
                'total': 'Выручка'
            })
            .reset_index()
        )
        daily_sales.to_excel(writer, sheet_name='Динамика по дням', index=False)
 
        # Лист 4: Разбивка по городам
        city_stats = (
            df.groupby('city')
            .agg({
                'order_id': 'count',
                'total': ['sum', 'mean'],
                'user_id': 'nunique'
            })
            .round(2)
        )
        city_stats.columns = ['Заказов', 'Выручка', 'Средний чек', 'Клиентов']
        city_stats.to_excel(writer, sheet_name='По городам')
 
    print(f"✅ Отчёт сохранён: {output_file}")
    return output_file
 
# Генерация отчёта
report_file = generate_monthly_report(2025, 12)

Форматирование Excel с помощью openpyxl

from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
 
def format_excel_report(file_path: str):
    """
    Добавляет форматирование к Excel отчёту.
    """
 
    wb = load_workbook(file_path)
 
    # Форматирование для всех листов
    for sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
 
        # Заголовки: жирный шрифт, серый фон
        header_fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid")
        header_font = Font(bold=True)
 
        for cell in ws[1]:  # Первая строка (заголовки)
            cell.fill = header_fill
            cell.font = header_font
            cell.alignment = Alignment(horizontal='center')
 
        # Автоширина колонок
        for column in ws.columns:
            max_length = 0
            column_letter = column[0].column_letter
            for cell in column:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            ws.column_dimensions[column_letter].width = adjusted_width
 
    wb.save(file_path)
    print(f"✅ Форматирование применено: {file_path}")
 
# Применение форматирования
format_excel_report(report_file)

pandas vs SQL/ORM: когда что использовать?

Используйте pandas, когда:

Ad-hoc анализ данных

# Быстрая проверка: сколько пользователей зарегистрировано за неделю?
df = pd.read_sql("SELECT * FROM users WHERE created_at >= NOW() - INTERVAL '7 days'", engine)
print(df.groupby(df['created_at'].dt.date).size())

Данные из разных источников

# Объединение данных из БД и CSV
users_db = pd.read_sql("SELECT * FROM users", engine)
purchases_csv = pd.read_csv('external_purchases.csv')
merged = users_db.merge(purchases_csv, on='user_id')

Сложные трансформации

# Pivot таблица, агрегация по нескольким измерениям
pivot = df.pivot_table(
    values='revenue',
    index='date',
    columns=['city', 'category'],
    aggfunc=['sum', 'mean']
)

Генерация отчётов (Excel, PDF)

df.to_excel('report.xlsx', sheet_name='Sales')

Используйте SQL/ORM, когда:

Продакшен-операции с БД

# Запись в БД в продакшене
# ❌ df.to_sql('users', engine)  # Медленно для больших данных
# ✅ Используйте bulk_insert_mappings или COPY

Транзакционные операции

# Требуется ACID, rollback, constraints
with session.begin():
    session.add(user)
    session.add(order)

Работа с большими данными (> 1GB)

# Обработка 10 млн строк — используйте SQL + индексы
# pandas загрузит всё в память и может упасть

Гибридный подход (лучшая практика)

# Используйте SQL для фильтрации, pandas для анализа
query = """
    SELECT * FROM orders
    WHERE created_at >= '2025-01-01'
    AND status = 'completed'
    LIMIT 100000
"""
df = pd.read_sql(query, engine)  # SQL: фильтрация
top_products = df.groupby('product_id')['quantity'].sum().nlargest(10)  # pandas: агрегация

Альтернативы pandas: когда их выбирать?

pandas — стандарт де-факто, но есть современные альтернативы с лучшей производительностью. Вот сравнение:

КритерийpandasPolarsDuckDBКогда выбрать
Скорость (малые данные <100MB)⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Polars для скорости
Скорость (большие данные 1-10GB)⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Polars/DuckDB
Использование памяти⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Polars эффективнее
Экосистема и библиотеки⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐pandas для совместимости
Порог входа⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐pandas/DuckDB проще
SQL-подобный синтаксисDuckDB для SQL-фанов
Lazy evaluationPolars/DuckDB
Поддержка команды⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐pandas стандарт

Практические примеры

pandas (классика):

import pandas as pd
 
df = pd.read_csv('data.csv')
result = (
    df.groupby('category')['sales']
    .sum()
    .sort_values(ascending=False)
    .head(10)
)

Polars (скорость):

import polars as pl
 
# В 5-10 раз быстрее pandas!
df = pl.read_csv('data.csv')
result = (
    df.lazy()  # Ленивое выполнение
    .group_by('category')
    .agg(pl.col('sales').sum())
    .sort('sales', descending=True)
    .limit(10)
    .collect()  # Выполнение
)

DuckDB (SQL поверх данных):

import duckdb
 
# SQL синтаксис, работает с pandas DataFrame
df = pd.read_csv('data.csv')
result = duckdb.sql("""
    SELECT category, SUM(sales) as total_sales
    FROM df
    GROUP BY category
    ORDER BY total_sales DESC
    LIMIT 10
""").to_df()

Когда переходить с pandas?

Оставайтесь на pandas, если:

  • Работаете с данными < 1GB
  • Нужна совместимость с библиотеками (scikit-learn, matplotlib)
  • Команда знает pandas

Переходите на Polars, если:

  • Нужна максимальная скорость обработки
  • Работаете с данными 1-50GB
  • Готовы к менее зрелой экосистеме

Используйте DuckDB, если:

  • Любите SQL и хотите его использовать для анализа
  • Нужно работать с данными > 10GB (out-of-memory обработка)
  • Хотите встроенную аналитическую БД без сервера

Практический совет: Начните с pandas. Когда упрётесь в производительность — попробуйте Polars или DuckDB. Они совместимы с pandas API, так что миграция будет простой.

Производительность и оптимизация

1. Выбор формата файла: benchmarks

Выбор формата хранения данных критичен для производительности. Вот ориентировочные измерения на файле 1GB (5 млн строк, 10 колонок):

Важно: Цифры зависят от SSD/HDD, CPU, типов данных и уровня сжатия. Используйте как ориентир, а не абсолютные значения.

ФорматЧтениеЗаписьРазмер на дискеСжатиеСовместимостьКогда использовать
CSV12.3 сек8.7 сек1.0 GB1.0x⭐⭐⭐⭐⭐Обмен данными, человекочитаемость
Parquet (snappy)1.4 сек2.1 сек240 MB4.2x⭐⭐⭐⭐Рекомендуется для промежуточных данных
Parquet (gzip)2.8 сек15.3 сек150 MB6.8x⭐⭐⭐⭐Долгосрочное хранение, архивы
Feather0.9 сек1.7 сек680 MB1.5x⭐⭐Обмен между Python/R
HDF53.2 сек4.5 сек580 MB1.7x⭐⭐⭐Сложные иерархические данные
Pickle1.1 сек0.8 сек950 MB1.1xТолько для внутреннего использования

Рекомендации:

# ✅ Parquet (snappy) — золотая середина для большинства задач
df.to_parquet('data.parquet', compression='snappy', index=False)
df = pd.read_parquet('data.parquet')
 
# ✅ CSV — только для обмена с внешними системами
df.to_csv('export.csv', index=False)
 
# ❌ Pickle — НЕ используйте для долгосрочного хранения
# Причины: несовместим между версиями pandas, уязвимость безопасности

Практический совет: Используйте Parquet с compression='snappy' для всех промежуточных результатов. Выигрыш: в 8x быстрее чтения + в 4x меньше места.

2. Используйте типы данных эффективно

# ❌ Плохо: всё загружается как object (строки)
df = pd.read_csv('large_file.csv')
 
# ✅ Хорошо: указываем типы явно
df = pd.read_csv(
    'large_file.csv',
    dtype={
        'user_id': 'int32',  # int32 вместо int64 если значения < 2 млрд
        'category': 'category',  # Экономит память для повторяющихся строк
        'price': 'float32'
    },
    parse_dates=['created_at']
)
 
# Проверка использования памяти
print(df.memory_usage(deep=True))
 
# Пример экономии памяти
# int64 → int32: экономия 50% памяти (8 байт → 4 байта)
# object → category: экономия 90% для повторяющихся значений

3. Chunked reading для больших файлов

# ❌ Плохо: загружает весь файл в память
df = pd.read_csv('huge_file.csv')  # 10GB файл — OOM
 
# ✅ Хорошо: обрабатываем по частям
chunks = []
for chunk in pd.read_csv('huge_file.csv', chunksize=10000):
    # Обработка chunk
    filtered = chunk[chunk['status'] == 'active']
    chunks.append(filtered)
 
df = pd.concat(chunks, ignore_index=True)

4. Vectorized operations вместо apply: почему это критично?

Что происходит под капотом?

❌ apply() с lambda — Python построчно:

# Медленно: каждая строка — вызов Python-функции
df['price_with_tax'] = df.apply(lambda row: row['price'] * 1.2, axis=1)
 
# Что происходит:
# for row in df.iterrows():  # Python цикл
#     result = lambda(row)    # Вызов Python-функции
#     # GIL блокирует параллелизм
#     # Overhead интерпретатора на каждой итерации

✅ Векторизованные операции — NumPy/C:

# Быстро: операция над всем массивом сразу
df['price_with_tax'] = df['price'] * 1.2
 
# Что происходит:
# Операция делегируется в NumPy (написан на C)
# Использует SIMD инструкции процессора (параллельная обработка)
# Нет overhead Python-интерпретатора
# Работает в 10-100 раз быстрее

Практическое сравнение

import pandas as pd
import numpy as np
import time
 
# Создаём тестовый DataFrame (100k строк)
df = pd.DataFrame({
    'price': np.random.randint(100, 10000, 100000),
    'quantity': np.random.randint(1, 100, 100000)
})
 
# Тест 1: apply() с lambda
start = time.time()
df['total_apply'] = df.apply(lambda row: row['price'] * row['quantity'], axis=1)
print(f"apply(): {time.time() - start:.3f} сек")  # ~1.2 сек
 
# Тест 2: Векторизованная операция
start = time.time()
df['total_vector'] = df['price'] * df['quantity']
print(f"vectorized: {time.time() - start:.3f} сек")  # ~0.003 сек
 
# Разница: 400x быстрее! ⚡

Когда apply() допустим?

# ✅ Допустимо: сложная логика, которую нельзя векторизовать
def complex_transformation(row):
    # Логика с вызовом внешнего API, регулярных выражений, etc.
    if row['status'] == 'pending' and row['amount'] > threshold:
        return external_api_call(row['id'])
    return None
 
df['result'] = df.apply(complex_transformation, axis=1)
 
# Но даже здесь лучше:
# 1. Сначала отфильтровать нужные строки
# 2. Применить apply() только к ним
filtered = df[(df['status'] == 'pending') & (df['amount'] > threshold)]
filtered['result'] = filtered.apply(complex_transformation, axis=1)

Альтернативы apply() для условной логики

# ❌ Медленно: apply для категоризации
df['category'] = df.apply(
    lambda row: 'expensive' if row['price'] > 1000 else 'cheap',
    axis=1
)
 
# ✅ Быстро: np.where (для 2 вариантов)
df['category'] = np.where(df['price'] > 1000, 'expensive', 'cheap')
 
# ✅ Быстро: np.select (для 3+ вариантов)
conditions = [
    df['price'] > 5000,
    df['price'] > 1000,
    df['price'] > 500
]
choices = ['premium', 'expensive', 'medium']
df['category'] = np.select(conditions, choices, default='cheap')
 
# ✅ Быстро: pd.cut (для диапазонов)
df['price_range'] = pd.cut(
    df['price'],
    bins=[0, 500, 1000, 5000, float('inf')],
    labels=['cheap', 'medium', 'expensive', 'premium']
)

Золотое правило: Если видите apply() в коде — всегда спросите себя: "Можно ли это векторизовать?". В 90% случаев — можно, и это даст прирост производительности в 10-100 раз.

Частые ошибки и как их избежать

Даже опытные разработчики наступают на одни и те же грабли при работе с pandas. Разберём самые критичные ошибки.

1. SettingWithCopyWarning — самая коварная ошибка

# ❌ ОШИБКА: модификация потенциальной копии
subset = df[df['sales'] > 100]
subset['discount'] = 0.1  # ⚠️ SettingWithCopyWarning!
# Изменения могут не примениться или применить к оригиналу
 
# ✅ ПРАВИЛЬНО: явное указание намерений
# Способ 1: модифицируем оригинальный DataFrame
df.loc[df['sales'] > 100, 'discount'] = 0.1
 
# Способ 2: создаём явную копию
subset = df[df['sales'] > 100].copy()
subset['discount'] = 0.1

Почему это происходит? pandas не может определить, хотите ли вы работать с копией или с оригиналом. Используйте .loc[] для модификации оригинала или .copy() для явной копии.

2. Игнорирование категориальных данных

# ❌ ПЛОХО: 1GB памяти для повторяющихся строк
df = pd.read_csv('data.csv')
print(df['country'].memory_usage(deep=True))  # 1GB
 
# ✅ ХОРОШО: 100MB памяти с category
df['country'] = df['country'].astype('category')
print(df['country'].memory_usage(deep=True))  # 100MB
 
# Экономия: 10x для колонок с малым числом уникальных значений

Когда использовать category:

  • Колонка имеет < 50% уникальных значений
  • Строковые данные (статусы, категории, города)
  • Повторяющиеся значения

3. Чтение всего файла в память

# ❌ ПЛОХО: падает на больших файлах
df = pd.read_csv('huge_10gb.csv')  # OutOfMemoryError
 
# ✅ ХОРОШО: обработка по частям
result = []
for chunk in pd.read_csv('huge_10gb.csv', chunksize=50000):
    # Фильтруем и обрабатываем только нужное
    filtered = chunk[chunk['status'] == 'active']
    result.append(filtered)
 
df = pd.concat(result, ignore_index=True)

4. Не указывать dtype при чтении

# ❌ МЕДЛЕННО: pandas угадывает типы для каждой колонки
df = pd.read_csv('data.csv')  # 10 секунд
 
# ✅ БЫСТРО: явно указываем типы
dtypes = {
    'id': 'int32',
    'price': 'float32',
    'category': 'category',
    'created_at': 'str'  # Парсим отдельно
}
df = pd.read_csv(
    'data.csv',
    dtype=dtypes,
    parse_dates=['created_at']
)  # 2 секунды
 
# Выигрыш: 5x скорость + меньше памяти

5. Использование iterrows() для больших данных

# ❌ КОШМАР: 1000 строк = 5 секунд
total = 0
for index, row in df.iterrows():
    total += row['price'] * row['quantity']
 
# ✅ ОТЛИЧНО: 1000 строк = 0.001 секунда
total = (df['price'] * df['quantity']).sum()
 
# Разница: 5000x быстрее!

Правило: Если вы используете iterrows() — вы делаете что-то не так. Почти всегда есть векторизованное решение.

6. Смешивание бизнес-логики и обработки данных

# ❌ ПЛОХО: всё в одной функции
def process_sales(df):
    # 50 строк нечитаемого кода
    df['margin'] = df['price'] - df['cost']
    df['category'] = df.apply(lambda x: categorize(x), axis=1)
    df['bonus'] = df.apply(lambda x: calculate_bonus(x), axis=1)
    # ... ещё 40 строк
    return df
 
# ✅ ХОРОШО: разделение ответственности
class SalesProcessor:
    @staticmethod
    def calculate_margin(df):
        """Расчёт маржи."""
        return df['price'] - df['cost']
 
    @staticmethod
    def categorize_products(df):
        """Категоризация товаров."""
        conditions = [df['margin'] > 100, df['margin'] > 50]
        choices = ['Premium', 'Standard']
        return np.select(conditions, choices, default='Budget')
 
    @staticmethod
    def calculate_bonuses(df):
        """Расчёт бонусов."""
        return np.where(df['category'] == 'Premium', df['margin'] * 0.1, 0)
 
    def process(self, df):
        """Основной метод обработки."""
        df = df.copy()
        df['margin'] = self.calculate_margin(df)
        df['category'] = self.categorize_products(df)
        df['bonus'] = self.calculate_bonuses(df)
        return df

Практический совет: Каждая функция должна делать одну вещь хорошо. Это упрощает тестирование, отладку и повторное использование кода.

Чек-лист перед commit

Проверьте свой pandas-код на эти ошибки:

  • Нет SettingWithCopyWarning
  • Используются категориальные типы где нужно
  • Большие файлы читаются чанками или с явными dtypes
  • Нет iterrows() или apply() там, где можно векторизовать
  • Бизнес-логика отделена от обработки данных
  • Есть обработка ошибок для операций с файлами

Type hints для pandas-кода

Type hints делают код более понятным и помогают IDE находить ошибки. Вот основные паттерны для pandas:

Базовые аннотации

import pandas as pd
from typing import List, Dict, Optional
 
def load_sales_data(file_path: str) -> pd.DataFrame:
    """Загружает данные о продажах из CSV."""
 
    return pd.read_csv(file_path)
 
def filter_by_category(
    df: pd.DataFrame,
    categories: List[str]
) -> pd.DataFrame:
    """Фильтрует данные по категориям."""
 
    return df[df['category'].isin(categories)]
 
def calculate_metrics(df: pd.DataFrame) -> Dict[str, float]:
    """Рассчитывает метрики."""
 
    return {
        'total_sales': float(df['sales'].sum()),
        'avg_price': float(df['price'].mean()),
        'max_quantity': int(df['quantity'].max())
    }

Работа с Series

def get_column(df: pd.DataFrame, column: str) -> pd.Series:
    """Возвращает колонку как Series."""
 
    return df[column]
 
def process_series(series: pd.Series) -> pd.Series:
    """Обрабатывает Series."""
 
    return series.fillna(0).astype(int)

Optional для возможных ошибок

def find_product(
    df: pd.DataFrame,
    product_id: int
) -> Optional[pd.Series]:
    """Ищет продукт по ID."""
 
    result = df[df['id'] == product_id]
    if result.empty:
        return None
    return result.iloc[0]

Современный подход с pandas-stubs

Для продвинутой типизации установите pandas-stubs:

pip install pandas-stubs

Это даёт:

  • Автодополнение методов pandas в IDE
  • Проверку типов с помощью mypy
  • Лучшую документацию кода
from pandas import DataFrame, Series
from typing import Literal
 
def aggregate_data(
    df: DataFrame,
    column: str,
    method: Literal['sum', 'mean', 'count']
) -> float:
    """Агрегирует данные с проверкой типов."""
 
    if method == 'sum':
        return float(df[column].sum())
    elif method == 'mean':
        return float(df[column].mean())
    else:
        return float(df[column].count())

Практический совет: Начните с аннотации входных и выходных параметров функций. Это уже даст 80% пользы type hints при минимальных усилиях.

Заключение

pandas — must-have инструмент для Python-разработчика, даже если вы не занимаетесь data science. Основные преимущества:

  • Скорость разработки: 3 строки вместо 30 строк чистого Python
  • Гибкость: работа с любыми форматами данных (CSV, JSON, Excel, SQL, Parquet)
  • Мощные операции: группировка, агрегация, join, pivot за 1 строку кода
  • Интеграция: легко подключается к БД, API, файлам

Что делать дальше:

Прямо сейчас (5 минут)

  1. Установите pandas: pip install pandas openpyxl sqlalchemy
  2. Возьмите свой access.log → скопируйте код из Use-case 1 → получите топ медленных эндпоинтов

На этой неделе (1 час)

  1. Возьмите Excel-выгрузку из 1С → используйте Use-case 2 → сгенерируйте отчёт по маржинальности
  2. Замените один apply() на векторизованную операцию в вашем коде → замерьте разницу

В ближайший месяц

  1. Создайте production-ready ETL с обработкой ошибок и логированием (Use-case 3)
  2. Добавьте type hints к своим pandas-функциям
  3. Попробуйте Polars или DuckDB на задаче, где pandas тормозит

Главное: Не откладывайте! Возьмите первый кейс из статьи, примените к своим данным прямо сейчас. Через 15 минут вы увидите результат, который раньше делали бы часами.

Чек-лист внедрения pandas в ваш проект

Шаг 1: Установка

  • pip install pandas openpyxl sqlalchemy — установить зависимости
  • Добавить в requirements.txt или pyproject.toml

Шаг 2: Первый кейс

  • Выбрать один use-case из статьи (логи/Excel/валидация)
  • Запустить код на ваших данных
  • Сохранить результат в Parquet вместо CSV

Шаг 3: Production-готовность

  • Добавить обработку ошибок (try/except) к pandas-коду
  • Добавить логирование прогресса (logging вместо print)
  • Указать явные dtypes при чтении больших файлов
  • Использовать chunked reading для файлов > 1GB

Шаг 4: Качество кода

  • Заменить хотя бы один apply() на векторизованную операцию
  • Добавить type hints (pd.DataFrame, pd.Series)
  • Написать минимальный тест (проверка schema результата)

Шаг 5: Автоматизация

  • Завернуть скрипт в функцию с параметрами
  • Добавить в cron/Airflow/CI для регулярного запуска
  • Настроить алерты при ошибках обработки

Полезные ресурсы