Skip to main content

Building a Motivational Quotes API with FastAPI

Константин Потапов
25 min

Step-by-step guide to creating a REST API for motivational quotes using FastAPI, SQLAlchemy, and Pydantic. From basic structure to production-ready solution with validation, migrations, and documentation.

In this guide, we'll build a production-ready REST API for retrieving motivational quotes. The application will be ready for production: with validation, database migrations, auto-generated documentation, and test coverage.

What you'll build:

  • 🚀 Fast FastAPI backend with async/await
  • 🗄️ PostgreSQL with SQLAlchemy 2.0
  • ✅ Pydantic for data validation
  • 📚 Auto-generated OpenAPI documentation
  • 🧪 Test coverage with pytest
  • 🔄 Migrations with Alembic

Target audience: Developers familiar with Python who want to learn FastAPI and modern API development approaches.

Application Architecture

Project Structure: Why This Way?

quotes-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI entry point
│   ├── config.py            # Settings (DATABASE_URL, etc.)
│   ├── database.py          # Database connection
│   ├── models.py            # SQLAlchemy models (DB tables)
│   ├── schemas.py           # Pydantic schemas (DTOs for API)
│   ├── crud.py              # CRUD operations
│   └── routers/
│       ├── __init__.py
│       └── quotes.py        # Quote endpoints
├── alembic/                 # Database migrations
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   └── test_quotes.py
├── requirements.txt
├── .env                     # Environment variables
└── README.md

Important to understand: models.py and schemas.py are NOT the same thing!

models.py vs schemas.py: What's the Difference?

This is a common source of confusion for FastAPI newcomers. Let's break it down:

models.py — SQLAlchemy Models (Database Layer)

  • Describe database table structure
  • Work directly with the database
  • Contain SQLAlchemy types: Integer, String, DateTime
  • Example: id: Mapped[int] = mapped_column(Integer, primary_key=True)

schemas.py — Pydantic Models (API Layer, DTOs)

  • Describe API data format (Data Transfer Objects)
  • Validate incoming/outgoing JSON
  • Contain Python types: int, str, datetime
  • Example: id: int, content: str

Why separate them?

  1. Different responsibilities — DB model may have fields that shouldn't be returned in API (e.g., password_hash)
  2. Different validation — API might require email format, while DB just stores a string
  3. Flexibility — Can change API format without changing DB structure and vice versa

Example:

# models.py (Database)
class User(Base):
    id: Mapped[int]
    email: Mapped[str]
    password_hash: Mapped[str]  # ⚠️ Should NOT go to API!
    created_at: Mapped[datetime]
 
# schemas.py (API)
class UserResponse(BaseModel):
    id: int
    email: str  # ✅ Without password!
    # created_at can be hidden from user

Why Not Extract DTOs to a Separate dto/ Folder?

For small projects (like ours):

  • ✅ Single schemas.py file — easier navigation
  • ✅ All Pydantic schemas in one place
  • ✅ Less nesting (app/schemas.py instead of app/dto/quote_dto.py)

For large projects (10+ entities):

app/
├── quotes/
│   ├── models.py       # Quote SQLAlchemy model
│   ├── schemas.py      # QuoteCreate, QuoteResponse
│   ├── crud.py         # CRUD for quotes
│   └── router.py       # Endpoints
├── users/
│   ├── models.py
│   ├── schemas.py
│   ├── crud.py
│   └── router.py

Recommendation: Start with flat structure (app/schemas.py), switch to modular when project grows (5+ files in app/).

Why crud.py Separate from models.py?

models.py — what (data structure) crud.py — how (operations on data)

# models.py — declaratively describe table
class Quote(Base):
    id: Mapped[int]
    content: Mapped[str]
 
# crud.py — imperatively describe logic
async def get_quote(db, quote_id):
    result = await db.execute(...)
    return result.scalar_one_or_none()

Why separate?

  1. Single Responsibility Principle — model only knows about structure
  2. Testability — easy to mock CRUD functions
  3. Reusability — one model, different CRUD (admin vs user API)

Technology Stack

ComponentTechnologyVersion
FrameworkFastAPI0.109+
ORMSQLAlchemy2.0+
ValidationPydantic2.0+
DatabasePostgreSQL15+
MigrationsAlembic1.13+
Testingpytest + httpxlatest
Serveruvicorn0.27+

Step 1: Installing Dependencies

requirements.txt

# Core
fastapi==0.109.0
uvicorn[standard]==0.27.0
python-dotenv==1.0.0
 
# Database
sqlalchemy==2.0.25
psycopg2-binary==2.9.9
alembic==1.13.1
 
# Validation
pydantic==2.5.3
pydantic-settings==2.1.0
 
# Testing
pytest==7.4.4
pytest-asyncio==0.23.3
httpx==0.26.0

Installation

Important: Examples use pip for compatibility and simplicity, but modern projects should use Poetry or uv for dependency management.

With pip (traditional approach):

# Create virtual environment
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate   # Windows
 
# Install dependencies
pip install -r requirements.txt

With Poetry (recommended for new projects):

# Initialize project
poetry init
 
# Add dependencies
poetry add fastapi uvicorn[standard] sqlalchemy psycopg2-binary
 
# Install and activate environment
poetry install
poetry shell

With uv (fastest package manager):

# Create project
uv venv
source .venv/bin/activate
 
# Install dependencies
uv pip install -r requirements.txt
 
# Or directly
uv pip install fastapi uvicorn[standard] sqlalchemy

Learn more about modern tools: See Poetry and uv Guide — comparison of pip, Poetry, uv with practical examples.

Why Poetry/uv Are Better Than pip?

pip:

  • ❌ Doesn't guarantee reproducibility (no lock file)
  • ❌ Slow dependency installation
  • ❌ Manual management of requirements.txt and requirements-dev.txt

Poetry:

  • poetry.lock guarantees identical versions
  • ✅ Automatic conflict resolution
  • ✅ Built-in dev/prod dependency separation

uv:

  • ✅ 10-100x faster than pip (written in Rust)
  • ✅ Compatible with pip (drop-in replacement)
  • ✅ Automatic virtual environment management

Recommendation: Use pip for learning, Poetry or uv for production projects.

Step 2: Environment Configuration

.env File

# Database
DATABASE_URL=postgresql://postgres:password@localhost:5432/quotes_db
 
# App
APP_NAME="Quotes API"
APP_VERSION="1.0.0"
DEBUG=True
 
# CORS (for frontend)
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000

app/config.py

from pydantic_settings import BaseSettings, SettingsConfigDict
 
class Settings(BaseSettings):
    """Application settings from environment variables"""
 
    # Database
    database_url: str
 
    # App
    app_name: str = "Quotes API"
    app_version: str = "1.0.0"
    debug: bool = False
 
    # CORS
    allowed_origins: list[str] = ["http://localhost:3000"]
 
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )
 
settings = Settings()

Step 3: Database Connection

app/database.py

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
 
# Create async engine
# Replace postgresql:// with postgresql+asyncpg://
DATABASE_URL = settings.database_url.replace(
    "postgresql://", "postgresql+asyncpg://"
)
 
engine = create_async_engine(
    DATABASE_URL,
    echo=settings.debug,  # SQL logging in dev
    future=True,
)
 
# Session factory
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)
 
# Base class for models
class Base(DeclarativeBase):
    pass
 
# Dependency for getting session
async def get_db() -> AsyncSession:
    """Creates and closes database session"""
    async with AsyncSessionLocal() as session:
        yield session

Why async/await?

Traditional sync approach:

# Blocks entire process during DB query
result = db.execute(query)  # ⏸️ All other requests wait

Async approach:

# While waiting for DB, process other requests
result = await db.execute(query)  # ⚡ Parallelize work

Result: FastAPI can handle thousands of concurrent requests on a single worker.

Why expire_on_commit=False?

By default SQLAlchemy invalidates all objects after commit():

quote = Quote(content="Test")
db.add(quote)
await db.commit()
print(quote.id)  # ❌ DetachedInstanceError — object detached from session!

With expire_on_commit=False:

quote = Quote(content="Test")
db.add(quote)
await db.commit()
print(quote.id)  # ✅ Works — object still accessible

Important: In FastAPI this is safe because each request gets its own session.

Why get_db() as Dependency?

Problem: Need to create and close DB session for each request.

Without dependency (bad):

@router.get("/quotes/{id}")
async def get_quote(id: int):
    db = AsyncSessionLocal()  # ❌ Need to close manually!
    quote = await db.get(Quote, id)
    await db.close()  # ❌ Forgot — connection leak!
    return quote

With dependency (good):

@router.get("/quotes/{id}")
async def get_quote(id: int, db: AsyncSession = Depends(get_db)):
    quote = await db.get(Quote, id)  # ✅ Session auto-closes
    return quote

Bonus: Easy to replace get_db() with test database in tests.

Important: Use asyncpg driver for async PostgreSQL work.

pip install asyncpg

Step 4: Database Models

app/models.py

from datetime import datetime
from sqlalchemy import String, Text, DateTime, Integer
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
 
class Quote(Base):
    """Quote model in database"""
 
    __tablename__ = "quotes"
 
    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    author: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
    category: Mapped[str | None] = mapped_column(String(100), nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False,
    )
 
    def __repr__(self) -> str:
        return f"<Quote(id={self.id}, author='{self.author}')>"

Why SQLAlchemy 2.0 style?

  • Typed mappings with Mapped[T]
  • Better type hints support
  • Fewer runtime errors

Step 5: Pydantic Schemas

app/schemas.py

from datetime import datetime
from pydantic import BaseModel, Field, ConfigDict
 
class QuoteBase(BaseModel):
    """Base quote schema"""
 
    content: str = Field(..., min_length=1, max_length=1000)
    author: str = Field(..., min_length=1, max_length=200)
    category: str | None = Field(None, max_length=100)
 
class QuoteCreate(QuoteBase):
    """Schema for creating quote"""
    pass
 
class QuoteUpdate(BaseModel):
    """Schema for updating quote"""
 
    content: str | None = Field(None, min_length=1, max_length=1000)
    author: str | None = Field(None, min_length=1, max_length=200)
    category: str | None = Field(None, max_length=100)
 
class QuoteResponse(QuoteBase):
    """Schema for quote response"""
 
    id: int
    created_at: datetime
 
    model_config = ConfigDict(from_attributes=True)
 
class QuoteList(BaseModel):
    """Schema for quote list"""
 
    quotes: list[QuoteResponse]
    total: int

Why Three Different Schemas?

This is the Input-Output segregation pattern. Let's break it down:

1. QuoteCreate (Input for POST)

class QuoteCreate(BaseModel):
    content: str = Field(..., min_length=1)
    author: str = Field(..., min_length=1)

Why are all fields required?

  • When creating a quote, must provide content and author
  • Field(...) means "required field"
  • min_length=1 protects against empty strings

2. QuoteUpdate (Input for PATCH)

class QuoteUpdate(BaseModel):
    content: str | None = None  # Optional!
    author: str | None = None   # Optional!

Why are all fields optional?

  • PATCH = partial update
  • Want to change only author → send {"author": "New"}
  • Want to change only content → send {"content": "New"}

Without schema separation:

# ❌ Bad — one schema for everything
await client.post("/quotes/", json={})  # Creates empty quote!

3. QuoteResponse (Output for GET)

class QuoteResponse(QuoteBase):
    id: int              # ✅ Added ID (generated by DB)
    created_at: datetime # ✅ Added timestamp

Why separate schema for response?

  • Client doesn't send id when creating (DB generates it)
  • Client doesn't send created_at (DB sets it)
  • But in response we must return these fields

model_config = ConfigDict(from_attributes=True) — Pydantic magic:

# Without from_attributes:
db_quote = Quote(id=1, content="Test")  # SQLAlchemy object
return QuoteResponse(**db_quote)  # ❌ Error!
 
# With from_attributes:
return QuoteResponse.model_validate(db_quote)  # ✅ Works!

FastAPI automatically calls model_validate() when using response_model=QuoteResponse.

Step 6: CRUD Operations

app/crud.py

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Quote
from app.schemas import QuoteCreate, QuoteUpdate
 
async def create_quote(db: AsyncSession, quote: QuoteCreate) -> Quote:
    """Creates new quote"""
 
    # Convert Pydantic model to SQLAlchemy model
    db_quote = Quote(**quote.model_dump())
 
    db.add(db_quote)           # Add to session
    await db.commit()          # Save to DB
    await db.refresh(db_quote) # Refresh object (get ID)
 
    return db_quote
 
async def get_quote(db: AsyncSession, quote_id: int) -> Quote | None:
    """Gets quote by ID"""
 
    result = await db.execute(select(Quote).where(Quote.id == quote_id))
    return result.scalar_one_or_none()  # None if not found
 
async def get_quotes(
    db: AsyncSession,
    skip: int = 0,
    limit: int = 100,
    author: str | None = None,
    category: str | None = None,
) -> list[Quote]:
    """Gets list of quotes with filtering"""
 
    query = select(Quote)
 
    if author:
        query = query.where(Quote.author.ilike(f"%{author}%"))
 
    if category:
        query = query.where(Quote.category == category)
 
    query = query.offset(skip).limit(limit)
 
    result = await db.execute(query)
    return list(result.scalars().all())
 
async def get_quotes_count(
    db: AsyncSession,
    author: str | None = None,
    category: str | None = None,
) -> int:
    """Counts total quotes"""
 
    query = select(func.count(Quote.id))
 
    if author:
        query = query.where(Quote.author.ilike(f"%{author}%"))
 
    if category:
        query = query.where(Quote.category == category)
 
    result = await db.execute(query)
    return result.scalar_one()
 
async def get_random_quote(db: AsyncSession) -> Quote | None:
    """Gets random quote"""
 
    # PostgreSQL specific: ORDER BY RANDOM()
    query = select(Quote).order_by(func.random()).limit(1)
 
    result = await db.execute(query)
    return result.scalar_one_or_none()
 
async def update_quote(
    db: AsyncSession,
    quote_id: int,
    quote_update: QuoteUpdate,
) -> Quote | None:
    """Updates quote"""
 
    db_quote = await get_quote(db, quote_id)
 
    if not db_quote:
        return None
 
    update_data = quote_update.model_dump(exclude_unset=True)
 
    for field, value in update_data.items():
        setattr(db_quote, field, value)
 
    await db.commit()
    await db.refresh(db_quote)
    return db_quote
 
async def delete_quote(db: AsyncSession, quote_id: int) -> bool:
    """Deletes quote"""
 
    db_quote = await get_quote(db, quote_id)
 
    if not db_quote:
        return False
 
    await db.delete(db_quote)
    await db.commit()
    return True

Key CRUD Concepts

1. Why quote.model_dump() Instead of quote.dict()?

Pydantic v2 changed the API:

# Pydantic v1 (old way)
quote.dict()  # ❌ Deprecated
 
# Pydantic v2 (new way)
quote.model_dump()  # ✅ Current method

2. Why await db.refresh(db_quote) After Commit?

db_quote = Quote(content="Test")  # id = None (not saved yet)
db.add(db_quote)
await db.commit()                 # DB generates id = 1
 
# Without refresh:
print(db_quote.id)  # None ❌
 
# With refresh:
await db.refresh(db_quote)
print(db_quote.id)  # 1 ✅

3. Why scalar_one_or_none() vs first() or one()?

SQLAlchemy 2.0 style — three options:

result = await db.execute(select(Quote).where(...))
 
# .one() — strictly one record, else error
quote = result.scalar_one()  # ❌ NoResultFound if empty
 
# .one_or_none() — one record or None
quote = result.scalar_one_or_none()  # ✅ None if not found
 
# .first() — first record or None (deprecated in 2.0)
quote = result.first()  # ⚠️ Better not use

4. Why query.offset(skip).limit(limit)?

This is pagination (splitting into pages):

# First page: 10 quotes
get_quotes(skip=0, limit=10)  # Records 1-10
 
# Second page: next 10
get_quotes(skip=10, limit=10)  # Records 11-20
 
# Third page
get_quotes(skip=20, limit=10)  # Records 21-30

Formula: skip = (page - 1) * limit

5. Why update_data = quote_update.model_dump(exclude_unset=True)?

Without exclude_unset:

# Client sent only {"author": "New"}
quote_update = QuoteUpdate(author="New")
 
quote_update.model_dump()
# {"author": "New", "content": None, "category": None}
# ❌ Overwrites content and category with None!

With exclude_unset=True:

quote_update.model_dump(exclude_unset=True)
# {"author": "New"}
# ✅ Updates only author, leaves rest untouched

Step 7: API Endpoints

app/routers/quotes.py

from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
 
from app import crud, schemas
from app.database import get_db
 
router = APIRouter(
    prefix="/quotes",
    tags=["quotes"],
)
 
@router.post(
    "/",
    response_model=schemas.QuoteResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Create quote",
)
async def create_quote(
    quote: schemas.QuoteCreate,
    db: AsyncSession = Depends(get_db),
):
    """Creates new quote"""
    return await crud.create_quote(db, quote)
 
@router.get(
    "/",
    response_model=schemas.QuoteList,
    summary="Get quote list",
)
async def list_quotes(
    skip: int = Query(0, ge=0, description="Skip N records"),
    limit: int = Query(10, ge=1, le=100, description="Limit records"),
    author: str | None = Query(None, description="Filter by author"),
    category: str | None = Query(None, description="Filter by category"),
    db: AsyncSession = Depends(get_db),
):
    """Gets list of quotes with pagination and filtering"""
 
    quotes = await crud.get_quotes(
        db,
        skip=skip,
        limit=limit,
        author=author,
        category=category,
    )
 
    total = await crud.get_quotes_count(db, author=author, category=category)
 
    return schemas.QuoteList(quotes=quotes, total=total)
 
@router.get(
    "/random",
    response_model=schemas.QuoteResponse,
    summary="Get random quote",
)
async def random_quote(db: AsyncSession = Depends(get_db)):
    """Gets random quote"""
 
    quote = await crud.get_random_quote(db)
 
    if not quote:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="No quotes found",
        )
 
    return quote
 
@router.get(
    "/{quote_id}",
    response_model=schemas.QuoteResponse,
    summary="Get quote by ID",
)
async def get_quote(
    quote_id: int,
    db: AsyncSession = Depends(get_db),
):
    """Gets quote by ID"""
 
    quote = await crud.get_quote(db, quote_id)
 
    if not quote:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Quote {quote_id} not found",
        )
 
    return quote
 
@router.patch(
    "/{quote_id}",
    response_model=schemas.QuoteResponse,
    summary="Update quote",
)
async def update_quote(
    quote_id: int,
    quote_update: schemas.QuoteUpdate,
    db: AsyncSession = Depends(get_db),
):
    """Partially updates quote"""
 
    quote = await crud.update_quote(db, quote_id, quote_update)
 
    if not quote:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Quote {quote_id} not found",
        )
 
    return quote
 
@router.delete(
    "/{quote_id}",
    status_code=status.HTTP_204_NO_CONTENT,
    summary="Delete quote",
)
async def delete_quote(
    quote_id: int,
    db: AsyncSession = Depends(get_db),
):
    """Deletes quote"""
 
    deleted = await crud.delete_quote(db, quote_id)
 
    if not deleted:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Quote {quote_id} not found",
        )

Why These Decorators?

1. response_model=schemas.QuoteResponse

@router.get("/quotes/{quote_id}", response_model=schemas.QuoteResponse)
async def get_quote(quote_id: int, db: AsyncSession = Depends(get_db)):
    return await crud.get_quote(db, quote_id)

What does response_model do?

  • ✅ Validates response (error if function returns wrong type)
  • ✅ Converts SQLAlchemy object to JSON
  • ✅ Filters fields (won't return password_hash if in model)
  • ✅ Generates OpenAPI schema for documentation

Without response_model (bad):

return db_quote  # ❌ Returns SQLAlchemy object, JSON won't serialize!

2. status_code=status.HTTP_201_CREATED

REST API semantics:

# POST (create) → 201 Created
@router.post("/", status_code=201)
 
# GET (read) → 200 OK (default)
@router.get("/")
 
# PATCH (update) → 200 OK
@router.patch("/{id}")
 
# DELETE (delete) → 204 No Content
@router.delete("/{id}", status_code=204)

Why status.HTTP_201_CREATED instead of 201?

# Magic numbers (bad)
@router.post("/", status_code=201)  # ❌ What does 201 mean?
 
# Constants (good)
@router.post("/", status_code=status.HTTP_201_CREATED)  # ✅ Clear!

3. Query() for Parameter Validation

skip: int = Query(0, ge=0, description="Skip N records")
limit: int = Query(10, ge=1, le=100, description="Limit records")

Why Query() if we can just use skip: int = 0?

Without Query:

async def list_quotes(skip: int = 0, limit: int = 10):
    # ❌ Client can send skip=-10, limit=999999

With Query:

async def list_quotes(
    skip: int = Query(0, ge=0),      # ✅ >= 0
    limit: int = Query(10, ge=1, le=100)  # ✅ from 1 to 100
):

If client sends ?limit=500, gets 422 Unprocessable Entity:

{
  "detail": [
    {
      "loc": ["query", "limit"],
      "msg": "ensure this value is less than or equal to 100",
      "type": "value_error.number.not_le"
    }
  ]
}

4. Why summary and tags?

@router.get("/random", tags=["quotes"], summary="Get random quote")

Generates beautiful documentation:

  • tags — groups endpoints in Swagger UI
  • summary — brief description in list
  • Docstring — detailed description when expanded

Key principles:

  • Use Depends(get_db) for session injection
  • Query() for query parameter validation
  • HTTP status codes (201, 404, 204)
  • Descriptions for auto-documentation

Step 8: Main Application File

app/main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
 
from app.config import settings
from app.database import engine, Base
from app.routers import quotes
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifecycle events: startup and shutdown"""
 
    # Startup: create tables (only for dev!)
    # In production use Alembic migrations
    if settings.debug:
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
 
    yield
 
    # Shutdown: close connections
    await engine.dispose()
 
app = FastAPI(
    title=settings.app_name,
    version=settings.app_version,
    description="REST API for motivational quotes",
    lifespan=lifespan,
)
 
# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.allowed_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# Include routers
app.include_router(quotes.router)
 
@app.get("/", tags=["root"])
async def root():
    """Health check endpoint"""
    return {
        "app": settings.app_name,
        "version": settings.app_version,
        "status": "running",
    }

Why Do We Need lifespan?

Problem: Need to execute code on app startup/shutdown.

Old way (deprecated):

@app.on_event("startup")
async def startup():
    # Code on startup
 
@app.on_event("shutdown")
async def shutdown():
    # Code on shutdown

New way (recommended):

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ⬇️ Code BEFORE yield — startup
    print("Starting up...")
    yield
    # ⬇️ Code AFTER yield — shutdown
    print("Shutting down...")

Why await engine.dispose() in shutdown?

# Close all DB connections on shutdown
await engine.dispose()

Without dispose:

  • PostgreSQL connections remain open
  • Connection pool might exhaust on restart
  • In production this is resource leak

Why CORS middleware?

Problem: Frontend on localhost:3000 can't call API on localhost:8000.

Browser blocks requests:

Access to fetch at 'http://localhost:8000/quotes'
from origin 'http://localhost:3000' has been blocked by CORS policy

Solution:

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],  # Allow frontend
    allow_credentials=True,
    allow_methods=["*"],  # All HTTP methods (GET, POST, etc.)
    allow_headers=["*"],  # All headers
)

Production setup:

# .env
ALLOWED_ORIGINS=https://myapp.com,https://www.myapp.com
 
# config.py
allowed_origins: list[str] = ["https://myapp.com"]

⚠️ Never use allow_origins=["*"] in production!

Step 9: Migrations with Alembic

Initializing Alembic

alembic init alembic

alembic/env.py

Update env.py to support async:

from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
 
from app.config import settings
from app.database import Base
from app.models import Quote  # Import models!
 
# Alembic Config
config = context.config
 
# Set DATABASE_URL from settings
config.set_main_option("sqlalchemy.url", settings.database_url)
 
if config.config_file_name is not None:
    fileConfig(config.config_file_name)
 
target_metadata = Base.metadata
 
def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
 
    with context.begin_transaction():
        context.run_migrations()
 
async def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
 
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
 
    await connectable.dispose()
 
def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
 
    with context.begin_transaction():
        context.run_migrations()
 
if context.is_offline_mode():
    run_migrations_offline()
else:
    import asyncio
    asyncio.run(run_migrations_online())

Creating First Migration

# Auto-generate migration
alembic revision --autogenerate -m "Create quotes table"
 
# Apply migration
alembic upgrade head

Step 10: Running the Application

Local Run

# Development mode with hot reload
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

Seeding Test Data

Create scripts/seed_data.py:

import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.models import Quote
 
quotes_data = [
    {
        "content": "Be yourself; everyone else is already taken.",
        "author": "Oscar Wilde",
        "category": "inspiration",
    },
    {
        "content": "The only way to do great work is to love what you do.",
        "author": "Steve Jobs",
        "category": "motivation",
    },
    {
        "content": "Life is what happens when you're busy making other plans.",
        "author": "John Lennon",
        "category": "life",
    },
]
 
async def seed_quotes():
    """Adds test quotes to database"""
 
    async with AsyncSessionLocal() as db:
        for data in quotes_data:
            quote = Quote(**data)
            db.add(quote)
 
        await db.commit()
        print(f"✅ Added {len(quotes_data)} quotes")
 
if __name__ == "__main__":
    asyncio.run(seed_quotes())

Run:

python scripts/seed_data.py

Check API

Open auto-generated documentation:

Step 11: Testing

tests/conftest.py

import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
 
from app.main import app
from app.database import Base, get_db
 
# Test database (in-memory SQLite)
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
 
@pytest.fixture(scope="function")
async def db_session():
    """Creates test database session"""
 
    engine = create_async_engine(TEST_DATABASE_URL, echo=False)
 
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
 
    TestSessionLocal = async_sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
 
    async with TestSessionLocal() as session:
        yield session
 
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
 
    await engine.dispose()
 
@pytest.fixture(scope="function")
async def client(db_session):
    """Creates test HTTP client"""
 
    async def override_get_db():
        yield db_session
 
    app.dependency_overrides[get_db] = override_get_db
 
    async with AsyncClient(app=app, base_url="http://test") as ac:
        yield ac
 
    app.dependency_overrides.clear()

tests/test_quotes.py

import pytest
 
@pytest.mark.asyncio
async def test_create_quote(client):
    """Test creating quote"""
 
    response = await client.post(
        "/quotes/",
        json={
            "content": "Test quote",
            "author": "Test Author",
            "category": "test",
        },
    )
 
    assert response.status_code == 201
    data = response.json()
    assert data["content"] == "Test quote"
    assert data["author"] == "Test Author"
    assert "id" in data
 
@pytest.mark.asyncio
async def test_get_quote(client):
    """Test getting quote"""
 
    # Create quote
    create_response = await client.post(
        "/quotes/",
        json={"content": "Get test", "author": "Author"},
    )
    quote_id = create_response.json()["id"]
 
    # Get quote
    response = await client.get(f"/quotes/{quote_id}")
 
    assert response.status_code == 200
    data = response.json()
    assert data["id"] == quote_id
    assert data["content"] == "Get test"
 
@pytest.mark.asyncio
async def test_get_nonexistent_quote(client):
    """Test getting nonexistent quote"""
 
    response = await client.get("/quotes/999")
 
    assert response.status_code == 404
    assert "not found" in response.json()["detail"].lower()
 
@pytest.mark.asyncio
async def test_list_quotes(client):
    """Test getting quote list"""
 
    # Create several quotes
    for i in range(3):
        await client.post(
            "/quotes/",
            json={"content": f"Quote {i}", "author": f"Author {i}"},
        )
 
    # Get list
    response = await client.get("/quotes/")
 
    assert response.status_code == 200
    data = response.json()
    assert data["total"] == 3
    assert len(data["quotes"]) == 3
 
@pytest.mark.asyncio
async def test_filter_quotes_by_author(client):
    """Test filtering by author"""
 
    await client.post(
        "/quotes/",
        json={"content": "Quote 1", "author": "Oscar Wilde"},
    )
    await client.post(
        "/quotes/",
        json={"content": "Quote 2", "author": "Steve Jobs"},
    )
 
    response = await client.get("/quotes/?author=Oscar")
 
    assert response.status_code == 200
    data = response.json()
    assert data["total"] == 1
    assert data["quotes"][0]["author"] == "Oscar Wilde"
 
@pytest.mark.asyncio
async def test_update_quote(client):
    """Test updating quote"""
 
    # Create quote
    create_response = await client.post(
        "/quotes/",
        json={"content": "Original", "author": "Author"},
    )
    quote_id = create_response.json()["id"]
 
    # Update
    response = await client.patch(
        f"/quotes/{quote_id}",
        json={"content": "Updated"},
    )
 
    assert response.status_code == 200
    data = response.json()
    assert data["content"] == "Updated"
    assert data["author"] == "Author"  # Unchanged
 
@pytest.mark.asyncio
async def test_delete_quote(client):
    """Test deleting quote"""
 
    # Create quote
    create_response = await client.post(
        "/quotes/",
        json={"content": "Delete me", "author": "Author"},
    )
    quote_id = create_response.json()["id"]
 
    # Delete
    response = await client.delete(f"/quotes/{quote_id}")
 
    assert response.status_code == 204
 
    # Verify deleted
    get_response = await client.get(f"/quotes/{quote_id}")
    assert get_response.status_code == 404
 
@pytest.mark.asyncio
async def test_random_quote(client):
    """Test getting random quote"""
 
    # Create quote
    await client.post(
        "/quotes/",
        json={"content": "Random quote", "author": "Author"},
    )
 
    # Get random
    response = await client.get("/quotes/random")
 
    assert response.status_code == 200
    data = response.json()
    assert "content" in data
    assert "author" in data

Running Tests

# Install SQLite dependency
pip install aiosqlite
 
# Run tests
pytest
 
# With coverage
pytest --cov=app --cov-report=term-missing

Production Deployment

Docker

Dockerfile:

FROM python:3.11-slim
 
WORKDIR /app
 
# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
# Application code
COPY app ./app
COPY alembic ./alembic
COPY alembic.ini .
 
# Run migrations and server
CMD alembic upgrade head && \
    uvicorn app.main:app --host 0.0.0.0 --port 8000

docker-compose.yml:

version: "3.8"
 
services:
  db:
    image: postgres:15
    environment:
      POSTGRES_DB: quotes_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
 
  api:
    build: .
    environment:
      DATABASE_URL: postgresql://postgres:password@db:5432/quotes_db
      APP_NAME: "Quotes API"
      DEBUG: "False"
    ports:
      - "8000:8000"
    depends_on:
      - db
 
volumes:
  postgres_data:

Run:

docker-compose up --build

Production Settings

Recommendations for production:

  1. Disable debug mode: DEBUG=False in .env
  2. Use Alembic for migrations (not create_all())
  3. Configure connection pool:
engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,
    max_overflow=0,
    pool_pre_ping=True,
)
  1. Add rate limiting (e.g., slowapi)
  2. Configure logging:
import logging
 
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
  1. Use environment-specific configs:
class Settings(BaseSettings):
    environment: str = "development"
 
    @property
    def is_production(self) -> bool:
        return self.environment == "production"

Improvements and Extensions

1. Authentication (JWT)

pip install python-jose[cryptography] passlib[bcrypt]

Add User model, /auth/register, /auth/login endpoints.

2. Caching (Redis)

pip install redis[hiredis]

Cache /quotes/random for speedup:

@router.get("/random")
async def random_quote(
    db: AsyncSession = Depends(get_db),
    cache: Redis = Depends(get_redis),
):
    cached = await cache.get("random_quote")
 
    if cached:
        return json.loads(cached)
 
    quote = await crud.get_random_quote(db)
    await cache.setex("random_quote", 60, json.dumps(quote))
 
    return quote

Add endpoint /quotes/search?q=motivation:

from sqlalchemy import or_
 
async def search_quotes(db: AsyncSession, query: str) -> list[Quote]:
    result = await db.execute(
        select(Quote).where(
            or_(
                Quote.content.ilike(f"%{query}%"),
                Quote.author.ilike(f"%{query}%"),
            )
        )
    )
    return list(result.scalars().all())

4. Background Tasks (Celery)

For sending daily quote emails:

pip install celery[redis]
from celery import Celery
 
celery = Celery("tasks", broker="redis://localhost:6379")
 
@celery.task
def send_daily_quote():
    # Get random quote and send email
    pass

5. Metrics and Monitoring

pip install prometheus-fastapi-instrumentator
from prometheus_fastapi_instrumentator import Instrumentator
 
app = FastAPI()
Instrumentator().instrument(app).expose(app)

Metrics available at /metrics.

Summary

You've built a production-ready REST API with:

  • FastAPI — modern async framework
  • SQLAlchemy 2.0 — typed ORM with async/await
  • Pydantic v2 — data validation
  • Alembic — database migrations
  • pytest — test coverage
  • OpenAPI — auto-documentation
  • CORS — frontend ready
  • Docker — containerization

Next Steps:

  1. Add JWT authentication
  2. Implement Redis caching
  3. Set up CI/CD (GitHub Actions, GitLab CI)
  4. Add monitoring (Prometheus + Grafana)
  5. Deploy to production (Railway, Fly.io, DigitalOcean)

Useful Links:

Keep learning FastAPI — it's a powerful tool for building fast and reliable APIs!