Перейти к содержимому
Technical Lead & Performance Engineer2024
#Python#pytest#Flask/FastAPI#cProfile#Memory Profiler#Pandas#Plotly

Route Profiler — Автоматический профилинг производительности

Автоматизированная система профилирования всех маршрутов веб-приложения. Динамическое обнаружение роутов, проверка наличия тестовых данных и измерение производительности каждого endpoint'а с генерацией детальных отчетов.

Контекст проекта

Проблема производительности веб-приложений:

По мере роста веб-приложения появляются десятки или сотни API endpoint'ов и страниц. Каждый из них может иметь проблемы производительности, которые незаметны в development, но критичны в production:

Типичные сценарии:

  • Новый роут добавлен без performance review
  • N+1 queries проблема в ORM (Eloquent, SQLAlchemy, Django ORM)
  • Медленная сериализация больших объектов
  • Memory leaks в long-running handlers
  • Неоптимальные database indexes
  • Избыточные API calls к внешним сервисам

Почему manual profiling не масштабируется:

  • Невозможно проверить все роуты вручную
  • Нет систематичности — проверяют только "подозрительные"
  • Отсутствие baseline — непонятно, что "медленно", а что нормально
  • No regression tracking — деградация performance незаметна
  • Missing test data — некоторые роуты вообще не тестируются

Задача

Создать автоматизированную систему профилирования, которая:

  • Автоматически находит все роуты приложения (Django)
  • Проверяет наличие mock/fixture данных для каждого роута
  • Ругается если данных нет (prevent untested routes)
  • Профилирует производительность: время, память, DB queries
  • Генерирует отчеты с визуализацией и alerting
  • Интегрируется в CI/CD для continuous performance monitoring
  • Отслеживает регрессии через baseline comparison

Решение

Архитектура системы

┌──────────────────────────────────────────────────────┐
│          Route Discovery Engine                      │
│   Автоматическое обнаружение всех роутов             │
│               (Django urls)                          │
└────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────┐
│        Fixture Validation Layer                      │
│   Проверка наличия mock/fixture для роута            │
│   Если нет → FAIL с сообщением                       │
└────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────┐
│          Profiling Execution Engine                  │
│                                                      │
│  1. Time profiling (cProfile, line_profiler)         │
│  2. Memory profiling (memory_profiler, tracemalloc)  │
│  3. Database query tracking (SQLAlchemy events)      │
│  4. Request/Response size measurement                │
└────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────┐
│         Metrics Collection & Analysis                │
│                                                      │
│  - Execution time (total, per-function)              │
│  - Memory usage (peak, allocated)                    │
│  - Database queries (count, time, duplicates)        │
│  - CPU utilization                                   │
│  - Bottleneck identification                         │
└────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────┐
│         Report Generation & Visualization            │
│                                                      │
│  - HTML report с графиками (Plotly)                  │
│  - CSV export для trending analysis                  │
│  - JSON для programmatic access                      │
│  - Alerts для регрессий                              │
└──────────────────────────────────────────────────────┘

Технологическая реализация

1. Route Discovery Engine

Автоматическое обнаружение роутов:

from typing import List, Dict, Any
from dataclasses import dataclass
import inspect
 
@dataclass
class RouteInfo:
    """Информация о маршруте"""
    path: str
    method: str
    handler: callable
    handler_name: str
    module: str
    requires_auth: bool
    params: List[str]  # URL parameters
    query_params: List[str]  # Query string parameters
 
class RouteDiscovery:
    """Обнаружение всех роутов в приложении"""
 
    @staticmethod
    def discover_flask_routes(app) -> List[RouteInfo]:
        """Извлечение роутов из Flask приложения"""
        routes = []
 
        for rule in app.url_map.iter_rules():
            # Пропускаем статические файлы
            if rule.endpoint == 'static':
                continue
 
            # Получаем handler функцию
            handler = app.view_functions[rule.endpoint]
 
            # Анализируем параметры
            route_params = list(rule.arguments)
 
            # Анализируем query params через signature
            sig = inspect.signature(handler)
            query_params = [
                param for param in sig.parameters.keys()
                if param not in route_params and param != 'self'
            ]
 
            # Проверяем auth requirements
            requires_auth = hasattr(handler, '_login_required') or \
                          'login_required' in str(handler.__code__.co_names)
 
            for method in rule.methods:
                if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
                    routes.append(RouteInfo(
                        path=str(rule),
                        method=method,
                        handler=handler,
                        handler_name=handler.__name__,
                        module=handler.__module__,
                        requires_auth=requires_auth,
                        params=route_params,
                        query_params=query_params
                    ))
 
        return routes
 
    @staticmethod
    def discover_fastapi_routes(app) -> List[RouteInfo]:
        """Извлечение роутов из FastAPI приложения"""
        routes = []
 
        for route in app.routes:
            if hasattr(route, 'methods'):
                for method in route.methods:
                    # Извлекаем path parameters
                    path_params = [
                        param.name for param in route.param_convertors.values()
                    ] if hasattr(route, 'param_convertors') else []
 
                    # Анализируем query parameters
                    sig = inspect.signature(route.endpoint)
                    query_params = [
                        name for name, param in sig.parameters.items()
                        if name not in path_params
                    ]
 
                    routes.append(RouteInfo(
                        path=route.path,
                        method=method,
                        handler=route.endpoint,
                        handler_name=route.endpoint.__name__,
                        module=route.endpoint.__module__,
                        requires_auth='Depends' in str(sig),
                        params=path_params,
                        query_params=query_params
                    ))
 
        return routes
 
    @staticmethod
    def discover_django_routes() -> List[RouteInfo]:
        """Извлечение роутов из Django приложения"""
        from django.urls import get_resolver
 
        routes = []
        url_patterns = get_resolver().url_patterns
 
        def extract_routes(patterns, prefix=''):
            for pattern in patterns:
                if hasattr(pattern, 'url_patterns'):
                    # Nested URLconf
                    extract_routes(pattern.url_patterns, prefix + str(pattern.pattern))
                else:
                    # Actual route
                    path = prefix + str(pattern.pattern)
                    callback = pattern.callback
 
                    routes.append(RouteInfo(
                        path=path,
                        method='GET',  # Django doesn't specify in URL
                        handler=callback,
                        handler_name=callback.__name__,
                        module=callback.__module__,
                        requires_auth=False,  # Check через decorator inspection
                        params=[],
                        query_params=[]
                    ))
 
        extract_routes(url_patterns)
        return routes

2. Fixture Validation Layer

Проверка наличия тестовых данных:

import pytest
from typing import Dict, Any, Optional
 
class FixtureRegistry:
    """Реестр fixtures для каждого роута"""
 
    def __init__(self):
        self._fixtures: Dict[str, Dict[str, Any]] = {}
 
    def register(
        self,
        route_path: str,
        method: str,
        fixture_data: Dict[str, Any]
    ):
        """Регистрация fixture для роута"""
        key = f"{method}:{route_path}"
        self._fixtures[key] = fixture_data
 
    def get(self, route_path: str, method: str) -> Optional[Dict[str, Any]]:
        """Получение fixture для роута"""
        key = f"{method}:{route_path}"
        return self._fixtures.get(key)
 
    def has_fixture(self, route_path: str, method: str) -> bool:
        """Проверка наличия fixture"""
        return f"{method}:{route_path}" in self._fixtures
 
 
# Глобальный реестр
fixture_registry = FixtureRegistry()
 
 
def route_fixture(path: str, method: str = 'GET'):
    """Decorator для регистрации fixtures"""
    def decorator(func):
        # Выполняем fixture функцию и сохраняем результат
        fixture_data = func()
        fixture_registry.register(path, method, fixture_data)
        return func
    return decorator
 
 
# Пример использования
@route_fixture('/api/users/<int:user_id>', 'GET')
def user_detail_fixture():
    return {
        'path_params': {'user_id': 123},
        'query_params': {},
        'headers': {'Authorization': 'Bearer test_token'},
        'expected_status': 200,
        'setup': lambda: create_test_user(id=123),
        'teardown': lambda: delete_test_user(123)
    }
 
@route_fixture('/api/users/<int:user_id>/orders', 'GET')
def user_orders_fixture():
    return {
        'path_params': {'user_id': 123},
        'query_params': {'page': 1, 'limit': 100},
        'headers': {'Authorization': 'Bearer test_token'},
        'expected_status': 200,
        'setup': lambda: create_test_user_with_orders(user_id=123, order_count=1000),
        'teardown': lambda: cleanup_test_data()
    }

Pytest плагин для валидации:

class RouteFixtureValidationPlugin:
    """Pytest plugin для проверки coverage fixtures"""
 
    def __init__(self, app, route_discovery: RouteDiscovery):
        self.app = app
        self.route_discovery = route_discovery
        self.routes = route_discovery.discover_flask_routes(app)
 
    def pytest_collection_modifyitems(self, items):
        """Добавляем тест для каждого роута без fixture"""
 
        missing_fixtures = []
 
        for route in self.routes:
            key = f"{route.method}:{route.path}"
            if not fixture_registry.has_fixture(route.path, route.method):
                missing_fixtures.append(route)
 
        if missing_fixtures:
            # Создаем failing тест
            test_item = MissingFixtureTest(
                name='test_all_routes_have_fixtures',
                parent=items[0].parent if items else None,
                missing_routes=missing_fixtures
            )
            items.insert(0, test_item)
 
 
class MissingFixtureTest(pytest.Item):
    """Failing тест если есть роуты без fixtures"""
 
    def __init__(self, name, parent, missing_routes):
        super().__init__(name, parent)
        self.missing_routes = missing_routes
 
    def runtest(self):
        if self.missing_routes:
            routes_str = '\n'.join([
                f"  - {r.method} {r.path}{r.handler_name}"
                for r in self.missing_routes
            ])
 
            pytest.fail(
                f"\n❌ Missing fixtures for {len(self.missing_routes)} routes:\n\n"
                f"{routes_str}\n\n"
                f"Please add fixtures using @route_fixture decorator.\n"
                f"Example:\n\n"
                f"@route_fixture('{self.missing_routes[0].path}', '{self.missing_routes[0].method}')\n"
                f"def fixture_name():\n"
                f"    return {{\n"
                f"        'path_params': {{}},\n"
                f"        'query_params': {{}},\n"
                f"        'setup': lambda: setup_test_data(),\n"
                f"    }}\n"
            )

3. Profiling Execution Engine

Комплексное профилирование:

import cProfile
import pstats
import io
import time
import tracemalloc
from dataclasses import dataclass
from typing import List
from contextlib import contextmanager
 
@dataclass
class ProfileResult:
    """Результаты профилирования роута"""
    route: RouteInfo
    execution_time: float  # seconds
    peak_memory: int  # bytes
    memory_allocated: int  # bytes
    cpu_time: float  # seconds
    function_calls: int
    db_queries: List[Dict]
    response_size: int  # bytes
    status_code: int
    bottlenecks: List[Dict]  # Top slow functions
 
class RouteProfiler:
    """Профилирование производительности роутов"""
 
    def __init__(self, app):
        self.app = app
        self.db_queries = []
 
    def profile_route(
        self,
        route: RouteInfo,
        fixture_data: Dict[str, Any]
    ) -> ProfileResult:
        """Профилирование одного роута"""
 
        # Setup test data
        if 'setup' in fixture_data:
            fixture_data['setup']()
 
        try:
            # Начинаем профилирование
            with self._profiling_context() as profiler:
                # Выполняем запрос
                response = self._execute_request(route, fixture_data)
 
            # Собираем результаты
            result = ProfileResult(
                route=route,
                execution_time=profiler['execution_time'],
                peak_memory=profiler['peak_memory'],
                memory_allocated=profiler['memory_allocated'],
                cpu_time=profiler['cpu_time'],
                function_calls=profiler['function_calls'],
                db_queries=self.db_queries.copy(),
                response_size=len(response.data) if response.data else 0,
                status_code=response.status_code,
                bottlenecks=profiler['bottlenecks']
            )
 
            return result
 
        finally:
            # Cleanup
            if 'teardown' in fixture_data:
                fixture_data['teardown']()
            self.db_queries.clear()
 
    @contextmanager
    def _profiling_context(self):
        """Context manager для профилирования"""
 
        # CPU profiling
        profiler = cProfile.Profile()
        profiler.enable()
 
        # Memory profiling
        tracemalloc.start()
 
        # Time measurement
        start_time = time.perf_counter()
 
        # DB query tracking
        self._setup_db_query_tracking()
 
        try:
            yield {}
 
        finally:
            # Stop profiling
            end_time = time.perf_counter()
            profiler.disable()
 
            # Memory stats
            current_memory, peak_memory = tracemalloc.get_traced_memory()
            tracemalloc.stop()
 
            # CPU stats
            stats_stream = io.StringIO()
            stats = pstats.Stats(profiler, stream=stats_stream)
            stats.sort_stats('cumulative')
 
            # Extract bottlenecks
            bottlenecks = self._extract_bottlenecks(stats)
 
            # Return results via context variable
            yield {
                'execution_time': end_time - start_time,
                'peak_memory': peak_memory,
                'memory_allocated': current_memory,
                'cpu_time': stats.total_tt,
                'function_calls': stats.total_calls,
                'bottlenecks': bottlenecks
            }
 
    def _setup_db_query_tracking(self):
        """Настройка отслеживания DB запросов"""
 
        # Для SQLAlchemy
        from sqlalchemy import event
        from sqlalchemy.engine import Engine
 
        @event.listens_for(Engine, "before_cursor_execute")
        def receive_before_cursor_execute(
            conn, cursor, statement, parameters, context, executemany
        ):
            context._query_start_time = time.perf_counter()
 
        @event.listens_for(Engine, "after_cursor_execute")
        def receive_after_cursor_execute(
            conn, cursor, statement, parameters, context, executemany
        ):
            total_time = time.perf_counter() - context._query_start_time
 
            self.db_queries.append({
                'sql': statement,
                'params': parameters,
                'execution_time': total_time,
                'row_count': cursor.rowcount if hasattr(cursor, 'rowcount') else None
            })
 
    def _extract_bottlenecks(self, stats: pstats.Stats, top_n: int = 10) -> List[Dict]:
        """Извлечение самых медленных функций"""
 
        bottlenecks = []
 
        stats.sort_stats('cumulative')
        for func, (cc, nc, tt, ct, callers) in list(stats.stats.items())[:top_n]:
            filename, line_no, func_name = func
 
            # Пропускаем stdlib
            if 'site-packages' in filename or 'lib/python' in filename:
                continue
 
            bottlenecks.append({
                'function': func_name,
                'file': filename,
                'line': line_no,
                'calls': nc,
                'total_time': tt,
                'cumulative_time': ct,
                'per_call': ct / nc if nc > 0 else 0
            })
 
        return bottlenecks
 
    def _execute_request(
        self,
        route: RouteInfo,
        fixture_data: Dict[str, Any]
    ):
        """Выполнение HTTP запроса к роуту"""
 
        # Подставляем параметры в путь
        path = route.path
        for param_name, param_value in fixture_data.get('path_params', {}).items():
            path = path.replace(f'<{param_name}>', str(param_value))
            path = path.replace(f'<int:{param_name}>', str(param_value))
            path = path.replace(f'<string:{param_name}>', str(param_value))
 
        # Выполняем запрос через test client
        with self.app.test_client() as client:
            response = client.open(
                path,
                method=route.method,
                query_string=fixture_data.get('query_params', {}),
                headers=fixture_data.get('headers', {}),
                json=fixture_data.get('json_body'),
                data=fixture_data.get('form_data')
            )
 
        return response

4. Performance Analysis & Regression Detection

Анализ метрик и детекция деградации:

import pandas as pd
from typing import Optional
 
class PerformanceAnalyzer:
    """Анализ производительности и детекция регрессий"""
 
    def __init__(self, baseline_path: Optional[str] = None):
        self.baseline_df = None
        if baseline_path:
            self.baseline_df = pd.read_csv(baseline_path)
 
    def analyze_results(
        self,
        results: List[ProfileResult]
    ) -> pd.DataFrame:
        """Анализ результатов профилирования"""
 
        # Конвертируем в DataFrame
        data = []
        for result in results:
            data.append({
                'route': f"{result.route.method} {result.route.path}",
                'handler': result.route.handler_name,
                'execution_time': result.execution_time,
                'peak_memory_mb': result.peak_memory / (1024 * 1024),
                'db_query_count': len(result.db_queries),
                'db_query_time': sum(q['execution_time'] for q in result.db_queries),
                'response_size_kb': result.response_size / 1024,
                'status_code': result.status_code,
                'function_calls': result.function_calls
            })
 
        df = pd.DataFrame(data)
 
        # Добавляем derived metrics
        df['db_time_percentage'] = (df['db_query_time'] / df['execution_time'] * 100)
        df['time_per_query'] = df['db_query_time'] / df['db_query_count']
 
        return df
 
    def detect_regressions(
        self,
        current_df: pd.DataFrame,
        threshold_percentage: float = 20.0
    ) -> List[Dict]:
        """Детекция регрессий производительности"""
 
        if self.baseline_df is None:
            return []
 
        regressions = []
 
        # Merge baseline и current
        merged = current_df.merge(
            self.baseline_df,
            on='route',
            suffixes=('_current', '_baseline')
        )
 
        # Проверяем деградацию
        for _, row in merged.iterrows():
            # Execution time regression
            time_diff_pct = (
                (row['execution_time_current'] - row['execution_time_baseline']) /
                row['execution_time_baseline'] * 100
            )
 
            if time_diff_pct > threshold_percentage:
                regressions.append({
                    'route': row['route'],
                    'metric': 'execution_time',
                    'baseline': row['execution_time_baseline'],
                    'current': row['execution_time_current'],
                    'degradation_pct': time_diff_pct,
                    'severity': 'critical' if time_diff_pct > 50 else 'warning'
                })
 
            # Memory regression
            memory_diff_pct = (
                (row['peak_memory_mb_current'] - row['peak_memory_mb_baseline']) /
                row['peak_memory_mb_baseline'] * 100
            )
 
            if memory_diff_pct > threshold_percentage:
                regressions.append({
                    'route': row['route'],
                    'metric': 'peak_memory',
                    'baseline': row['peak_memory_mb_baseline'],
                    'current': row['peak_memory_mb_current'],
                    'degradation_pct': memory_diff_pct,
                    'severity': 'critical' if memory_diff_pct > 100 else 'warning'
                })
 
            # N+1 query detection
            query_diff = (
                row['db_query_count_current'] - row['db_query_count_baseline']
            )
 
            if query_diff > 5:  # Более 5 новых запросов
                regressions.append({
                    'route': row['route'],
                    'metric': 'db_queries',
                    'baseline': row['db_query_count_baseline'],
                    'current': row['db_query_count_current'],
                    'degradation_pct': query_diff / row['db_query_count_baseline'] * 100,
                    'severity': 'warning',
                    'note': 'Possible N+1 query problem'
                })
 
        return regressions
 
    def identify_slow_routes(
        self,
        df: pd.DataFrame,
        time_threshold: float = 1.0  # seconds
    ) -> pd.DataFrame:
        """Идентификация медленных роутов"""
 
        slow_routes = df[df['execution_time'] > time_threshold].copy()
        slow_routes = slow_routes.sort_values('execution_time', ascending=False)
 
        return slow_routes
 
    def find_n_plus_one_queries(
        self,
        results: List[ProfileResult]
    ) -> List[Dict]:
        """Поиск N+1 query проблем"""
 
        suspicious_routes = []
 
        for result in results:
            if not result.db_queries:
                continue
 
            # Группируем похожие запросы
            query_patterns = {}
            for query in result.db_queries:
                # Нормализуем SQL (убираем параметры)
                normalized = self._normalize_sql(query['sql'])
 
                if normalized not in query_patterns:
                    query_patterns[normalized] = []
 
                query_patterns[normalized].append(query)
 
            # Ищем паттерны с множественными выполнениями
            for pattern, queries in query_patterns.items():
                if len(queries) > 10:  # Более 10 одинаковых запросов
                    suspicious_routes.append({
                        'route': f"{result.route.method} {result.route.path}",
                        'query_pattern': pattern,
                        'execution_count': len(queries),
                        'total_time': sum(q['execution_time'] for q in queries),
                        'severity': 'high' if len(queries) > 50 else 'medium'
                    })
 
        return suspicious_routes
 
    def _normalize_sql(self, sql: str) -> str:
        """Нормализация SQL для группировки"""
        import re
 
        # Убираем IN (...) со списками ID
        sql = re.sub(r'IN \([0-9, ]+\)', 'IN (...)', sql)
 
        # Убираем конкретные значения WHERE
        sql = re.sub(r"= '[^']+'", "= ?", sql)
        sql = re.sub(r'= [0-9]+', '= ?', sql)
 
        return sql.strip()

5. Report Generation

HTML отчет с визуализацией:

import plotly.graph_objects as go
import plotly.express as px
from jinja2 import Template
 
class ReportGenerator:
    """Генерация отчетов производительности"""
 
    def generate_html_report(
        self,
        df: pd.DataFrame,
        regressions: List[Dict],
        n_plus_one: List[Dict],
        output_path: str
    ):
        """Генерация HTML отчета с графиками"""
 
        # 1. График времени выполнения
        fig_time = px.bar(
            df.sort_values('execution_time', ascending=False).head(20),
            x='route',
            y='execution_time',
            title='Top 20 Slowest Routes',
            labels={'execution_time': 'Execution Time (s)'},
            color='execution_time',
            color_continuous_scale='Reds'
        )
 
        # 2. График использования памяти
        fig_memory = px.bar(
            df.sort_values('peak_memory_mb', ascending=False).head(20),
            x='route',
            y='peak_memory_mb',
            title='Top 20 Memory-Intensive Routes',
            labels={'peak_memory_mb': 'Peak Memory (MB)'},
            color='peak_memory_mb',
            color_continuous_scale='Blues'
        )
 
        # 3. Scatter plot: DB queries vs время
        fig_db = px.scatter(
            df,
            x='db_query_count',
            y='execution_time',
            size='db_query_time',
            hover_data=['route'],
            title='Database Queries Impact on Performance',
            labels={
                'db_query_count': 'Number of Queries',
                'execution_time': 'Execution Time (s)'
            }
        )
 
        # 4. Heatmap регрессий
        if regressions:
            regression_df = pd.DataFrame(regressions)
            fig_regression = px.bar(
                regression_df.sort_values('degradation_pct', ascending=False),
                x='route',
                y='degradation_pct',
                color='severity',
                title='Performance Regressions',
                labels={'degradation_pct': 'Degradation (%)'}
            )
        else:
            fig_regression = None
 
        # Рендерим HTML
        html_template = self._get_html_template()
 
        html = html_template.render(
            summary={
                'total_routes': len(df),
                'slow_routes': len(df[df['execution_time'] > 1.0]),
                'regressions': len(regressions),
                'n_plus_one_issues': len(n_plus_one),
                'avg_time': df['execution_time'].mean(),
                'max_time': df['execution_time'].max(),
                'avg_memory': df['peak_memory_mb'].mean(),
                'total_db_queries': df['db_query_count'].sum()
            },
            fig_time=fig_time.to_html(full_html=False),
            fig_memory=fig_memory.to_html(full_html=False),
            fig_db=fig_db.to_html(full_html=False),
            fig_regression=fig_regression.to_html(full_html=False) if fig_regression else '',
            regressions=regressions,
            n_plus_one=n_plus_one,
            detailed_table=df.to_html(classes='table table-striped')
        )
 
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(html)
 
    def _get_html_template(self) -> Template:
        """HTML шаблон отчета"""
        # ... (полный HTML template с Bootstrap и Plotly)
        pass

6. CI/CD Integration

pytest plugin и GitHub Actions:

# conftest.py
import pytest
 
def pytest_configure(config):
    """Конфигурация pytest plugin"""
    config.pluginmanager.register(
        RouteProfilerPlugin(config),
        "route_profiler"
    )
 
class RouteProfilerPlugin:
    def __init__(self, config):
        self.config = config
 
    @pytest.fixture(scope='session', autouse=True)
    def profile_all_routes(self, app):
        """Автоматическое профилирование всех роутов"""
 
        discovery = RouteDiscovery()
        routes = discovery.discover_flask_routes(app)
 
        profiler = RouteProfiler(app)
        analyzer = PerformanceAnalyzer(baseline_path='baseline.csv')
 
        results = []
 
        for route in routes:
            # Получаем fixture
            fixture_data = fixture_registry.get(route.path, route.method)
 
            if not fixture_data:
                pytest.fail(f"Missing fixture for {route.method} {route.path}")
 
            # Профилируем
            result = profiler.profile_route(route, fixture_data)
            results.append(result)
 
        # Анализируем
        df = analyzer.analyze_results(results)
        regressions = analyzer.detect_regressions(df)
        n_plus_one = analyzer.find_n_plus_one_queries(results)
 
        # Генерируем отчет
        report_gen = ReportGenerator()
        report_gen.generate_html_report(
            df, regressions, n_plus_one,
            'performance_report.html'
        )
 
        # Сохраняем baseline
        df.to_csv('current_baseline.csv', index=False)
 
        # Fail если есть критичные регрессии
        critical_regressions = [
            r for r in regressions if r['severity'] == 'critical'
        ]
 
        if critical_regressions:
            pytest.fail(
                f"❌ {len(critical_regressions)} critical performance regressions detected!"
            )

GitHub Actions workflow:

name: Performance Profiling
 
on:
  push:
    branches: [main, develop]
  pull_request:
 
jobs:
  performance-test:
    runs-on: ubuntu-latest
 
    steps:
      - uses: actions/checkout@v3
 
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
 
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov
 
      - name: Download baseline
        run: |
          # Скачиваем baseline из artifacts предыдущего run
          gh run download --name performance-baseline || true
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
 
      - name: Run performance profiling
        run: |
          pytest tests/ --profile-routes --baseline=baseline.csv
 
      - name: Upload performance report
        if: always()
        uses: actions/upload-artifact@v3
        with:
          name: performance-report
          path: performance_report.html
 
      - name: Upload new baseline
        uses: actions/upload-artifact@v3
        with:
          name: performance-baseline
          path: current_baseline.csv
 
      - name: Comment on PR
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const report = fs.readFileSync('performance_summary.md', 'utf8');
 
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: report
            });

Результаты

Route Profiler трансформировал ad-hoc performance testing в систематический, автоматизированный процесс с полным покрытием всех маршрутов приложения.

Предотвращение проблем:

  • 100% coverage — все роуты профилируются автоматически
  • Раннее обнаружение — регрессии ловятся в CI/CD до production
  • N+1 detection — автоматическое выявление ORM antipatterns
  • Baseline tracking — исторический контекст для оценки изменений

Visibility & Insights:

  • Детальные метрики — время, память, DB queries для каждого роута
  • Bottleneck identification — точные функции, замедляющие работу
  • Visual reports — графики и дашборды для stakeholders
  • Data-driven optimization — понятно, что оптимизировать в первую очередь

Developer Experience:

  • Enforce test data — нельзя добавить роут без fixture (fail fast)
  • Automatic discovery — не нужно вручную перечислять роуты
  • Clear feedback — точные отчеты о проблемах с предложениями
  • CI/CD integration — seamless workflow

Бизнес-эффект:

  • Better UX — быстрые роуты = довольные пользователи
  • Cost savings — оптимизация снижает server costs
  • Incident prevention — проблемы не доходят до production
  • Scalability confidence — понимание bottlenecks перед масштабированием

Ключевые выводы

Shift-left performance testing: Традиционно performance testing делается после разработки, часто уже в production. Route Profiler сдвигает его влево — в CI/CD, делая частью обычного workflow.

Automation > Manual testing: Невозможно вручную профилировать десятки/сотни роутов при каждом изменении. Автоматизация — единственный способ поддерживать performance quality в растущем приложении.

Data-driven optimization: Вместо оптимизации "на глаз" или по подозрениям — точные данные о том, что медленно, почему медленно, и насколько критично.

Test data enforcement: Требование fixtures для каждого роута — это не бюрократия, а гарантия тестируемости. Если нет mock данных — роут по сути untested.

Holistic profiling: Профилирование не только времени, но и памяти, DB queries, CPU дает полную картину для optimization. Оптимизировать только время — значит упустить memory leaks или database bottlenecks.


Применимость:

  • SaaS продукты с API
  • E-commerce (критична производительность checkout)
  • Content platforms (множество routes)
  • Микросервисы (профилирование каждого сервиса)

Система особенно ценна для быстрорастущих приложений, где performance деградация может незаметно накапливаться с каждым feature release.

Технические навыки:

  • Profiling (cProfile, memory_profiler, tracemalloc)
  • Data analysis (Pandas)
  • Visualization (Plotly)
  • Testing infrastructure (pytest plugins)
  • DevOps (CI/CD integration)

Похожие материалы

Проекты с похожими технологиями и задачами

PVS-Studio — Система автоматизированного тестирования

QA Automation Engineer • 2023

Комплексная система E2E-тестирования на Selenium для полного покрытия корпоративного сайта pvs-studio.com. Автоматизация регрессионного тестирования критически важного бизнес-ресурса.

  • Selenium
  • Python
  • pytest
  • Docker
  • CI/CD
Читать детальный кейс →

Цифровой тьютор

Backend-разработчик • 2020

Аналитическая платформа для дирекции университета с инструментами оценки усвоения материала студентами и рекомендациями по оптимизации учебного процесса

  • Python
  • Django
  • PostgreSQL
  • Redis
  • Celery
  • +2
Читать детальный кейс →

Perfector — Визуальное регрессионное тестирование

Technical Lead & Architect • 2024

Автоматизированная система визуального тестирования UI для предотвращения непреднамеренных изменений верстки. Снепшоты страниц + OpenCV для детекции различий + автоматические отчеты для команды разработки.

  • Python
  • OpenCV
  • Selenium
  • Pillow
  • Docker
  • +2
Читать детальный кейс →