In this guide, we'll build a production-ready REST API for retrieving motivational quotes. The application will be ready for production: with validation, database migrations, auto-generated documentation, and test coverage.
What you'll build:
- 🚀 Fast FastAPI backend with async/await
- 🗄️ PostgreSQL with SQLAlchemy 2.0
- ✅ Pydantic for data validation
- 📚 Auto-generated OpenAPI documentation
- 🧪 Test coverage with pytest
- 🔄 Migrations with Alembic
Target audience: Developers familiar with Python who want to learn FastAPI and modern API development approaches.
Application Architecture
Project Structure: Why This Way?
quotes-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI entry point
│ ├── config.py # Settings (DATABASE_URL, etc.)
│ ├── database.py # Database connection
│ ├── models.py # SQLAlchemy models (DB tables)
│ ├── schemas.py # Pydantic schemas (DTOs for API)
│ ├── crud.py # CRUD operations
│ └── routers/
│ ├── __init__.py
│ └── quotes.py # Quote endpoints
├── alembic/ # Database migrations
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ └── test_quotes.py
├── requirements.txt
├── .env # Environment variables
└── README.md
Important to understand: models.py and schemas.py are NOT the same thing!
models.py vs schemas.py: What's the Difference?
This is a common source of confusion for FastAPI newcomers. Let's break it down:
models.py — SQLAlchemy Models (Database Layer)
- Describe database table structure
- Work directly with the database
- Contain SQLAlchemy types:
Integer,String,DateTime - Example:
id: Mapped[int] = mapped_column(Integer, primary_key=True)
schemas.py — Pydantic Models (API Layer, DTOs)
- Describe API data format (Data Transfer Objects)
- Validate incoming/outgoing JSON
- Contain Python types:
int,str,datetime - Example:
id: int,content: str
Why separate them?
- Different responsibilities — DB model may have fields that shouldn't be returned in API (e.g.,
password_hash) - Different validation — API might require email format, while DB just stores a string
- Flexibility — Can change API format without changing DB structure and vice versa
Example:
# models.py (Database)
class User(Base):
id: Mapped[int]
email: Mapped[str]
password_hash: Mapped[str] # ⚠️ Should NOT go to API!
created_at: Mapped[datetime]
# schemas.py (API)
class UserResponse(BaseModel):
id: int
email: str # ✅ Without password!
# created_at can be hidden from userWhy Not Extract DTOs to a Separate dto/ Folder?
For small projects (like ours):
- ✅ Single
schemas.pyfile — easier navigation - ✅ All Pydantic schemas in one place
- ✅ Less nesting (
app/schemas.pyinstead ofapp/dto/quote_dto.py)
For large projects (10+ entities):
app/
├── quotes/
│ ├── models.py # Quote SQLAlchemy model
│ ├── schemas.py # QuoteCreate, QuoteResponse
│ ├── crud.py # CRUD for quotes
│ └── router.py # Endpoints
├── users/
│ ├── models.py
│ ├── schemas.py
│ ├── crud.py
│ └── router.py
Recommendation: Start with flat structure (app/schemas.py), switch to modular when project grows (5+ files in app/).
Why crud.py Separate from models.py?
models.py — what (data structure) crud.py — how (operations on data)
# models.py — declaratively describe table
class Quote(Base):
id: Mapped[int]
content: Mapped[str]
# crud.py — imperatively describe logic
async def get_quote(db, quote_id):
result = await db.execute(...)
return result.scalar_one_or_none()Why separate?
- Single Responsibility Principle — model only knows about structure
- Testability — easy to mock CRUD functions
- Reusability — one model, different CRUD (admin vs user API)
Technology Stack
| Component | Technology | Version |
|---|---|---|
| Framework | FastAPI | 0.109+ |
| ORM | SQLAlchemy | 2.0+ |
| Validation | Pydantic | 2.0+ |
| Database | PostgreSQL | 15+ |
| Migrations | Alembic | 1.13+ |
| Testing | pytest + httpx | latest |
| Server | uvicorn | 0.27+ |
Step 1: Installing Dependencies
requirements.txt
# Core
fastapi==0.109.0
uvicorn[standard]==0.27.0
python-dotenv==1.0.0
# Database
sqlalchemy==2.0.25
psycopg2-binary==2.9.9
alembic==1.13.1
# Validation
pydantic==2.5.3
pydantic-settings==2.1.0
# Testing
pytest==7.4.4
pytest-asyncio==0.23.3
httpx==0.26.0Installation
Important: Examples use pip for compatibility and simplicity, but modern projects should use Poetry or uv for dependency management.
With pip (traditional approach):
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# Install dependencies
pip install -r requirements.txtWith Poetry (recommended for new projects):
# Initialize project
poetry init
# Add dependencies
poetry add fastapi uvicorn[standard] sqlalchemy psycopg2-binary
# Install and activate environment
poetry install
poetry shellWith uv (fastest package manager):
# Create project
uv venv
source .venv/bin/activate
# Install dependencies
uv pip install -r requirements.txt
# Or directly
uv pip install fastapi uvicorn[standard] sqlalchemyLearn more about modern tools: See Poetry and uv Guide — comparison of pip, Poetry, uv with practical examples.
Why Poetry/uv Are Better Than pip?
pip:
- ❌ Doesn't guarantee reproducibility (no lock file)
- ❌ Slow dependency installation
- ❌ Manual management of
requirements.txtandrequirements-dev.txt
Poetry:
- ✅
poetry.lockguarantees identical versions - ✅ Automatic conflict resolution
- ✅ Built-in dev/prod dependency separation
uv:
- ✅ 10-100x faster than pip (written in Rust)
- ✅ Compatible with pip (drop-in replacement)
- ✅ Automatic virtual environment management
Recommendation: Use pip for learning, Poetry or uv for production projects.
Step 2: Environment Configuration
.env File
# Database
DATABASE_URL=postgresql://postgres:password@localhost:5432/quotes_db
# App
APP_NAME="Quotes API"
APP_VERSION="1.0.0"
DEBUG=True
# CORS (for frontend)
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings from environment variables"""
# Database
database_url: str
# App
app_name: str = "Quotes API"
app_version: str = "1.0.0"
debug: bool = False
# CORS
allowed_origins: list[str] = ["http://localhost:3000"]
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
settings = Settings()Step 3: Database Connection
app/database.py
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
# Create async engine
# Replace postgresql:// with postgresql+asyncpg://
DATABASE_URL = settings.database_url.replace(
"postgresql://", "postgresql+asyncpg://"
)
engine = create_async_engine(
DATABASE_URL,
echo=settings.debug, # SQL logging in dev
future=True,
)
# Session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
# Base class for models
class Base(DeclarativeBase):
pass
# Dependency for getting session
async def get_db() -> AsyncSession:
"""Creates and closes database session"""
async with AsyncSessionLocal() as session:
yield sessionWhy async/await?
Traditional sync approach:
# Blocks entire process during DB query
result = db.execute(query) # ⏸️ All other requests waitAsync approach:
# While waiting for DB, process other requests
result = await db.execute(query) # ⚡ Parallelize workResult: FastAPI can handle thousands of concurrent requests on a single worker.
Why expire_on_commit=False?
By default SQLAlchemy invalidates all objects after commit():
quote = Quote(content="Test")
db.add(quote)
await db.commit()
print(quote.id) # ❌ DetachedInstanceError — object detached from session!With expire_on_commit=False:
quote = Quote(content="Test")
db.add(quote)
await db.commit()
print(quote.id) # ✅ Works — object still accessibleImportant: In FastAPI this is safe because each request gets its own session.
Why get_db() as Dependency?
Problem: Need to create and close DB session for each request.
Without dependency (bad):
@router.get("/quotes/{id}")
async def get_quote(id: int):
db = AsyncSessionLocal() # ❌ Need to close manually!
quote = await db.get(Quote, id)
await db.close() # ❌ Forgot — connection leak!
return quoteWith dependency (good):
@router.get("/quotes/{id}")
async def get_quote(id: int, db: AsyncSession = Depends(get_db)):
quote = await db.get(Quote, id) # ✅ Session auto-closes
return quoteBonus: Easy to replace get_db() with test database in tests.
Important: Use asyncpg driver for async PostgreSQL work.
pip install asyncpgStep 4: Database Models
app/models.py
from datetime import datetime
from sqlalchemy import String, Text, DateTime, Integer
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class Quote(Base):
"""Quote model in database"""
__tablename__ = "quotes"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
author: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
category: Mapped[str | None] = mapped_column(String(100), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False,
)
def __repr__(self) -> str:
return f"<Quote(id={self.id}, author='{self.author}')>"Why SQLAlchemy 2.0 style?
- Typed mappings with
Mapped[T] - Better type hints support
- Fewer runtime errors
Step 5: Pydantic Schemas
app/schemas.py
from datetime import datetime
from pydantic import BaseModel, Field, ConfigDict
class QuoteBase(BaseModel):
"""Base quote schema"""
content: str = Field(..., min_length=1, max_length=1000)
author: str = Field(..., min_length=1, max_length=200)
category: str | None = Field(None, max_length=100)
class QuoteCreate(QuoteBase):
"""Schema for creating quote"""
pass
class QuoteUpdate(BaseModel):
"""Schema for updating quote"""
content: str | None = Field(None, min_length=1, max_length=1000)
author: str | None = Field(None, min_length=1, max_length=200)
category: str | None = Field(None, max_length=100)
class QuoteResponse(QuoteBase):
"""Schema for quote response"""
id: int
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class QuoteList(BaseModel):
"""Schema for quote list"""
quotes: list[QuoteResponse]
total: intWhy Three Different Schemas?
This is the Input-Output segregation pattern. Let's break it down:
1. QuoteCreate (Input for POST)
class QuoteCreate(BaseModel):
content: str = Field(..., min_length=1)
author: str = Field(..., min_length=1)Why are all fields required?
- When creating a quote, must provide content and author
Field(...)means "required field"min_length=1protects against empty strings
2. QuoteUpdate (Input for PATCH)
class QuoteUpdate(BaseModel):
content: str | None = None # Optional!
author: str | None = None # Optional!Why are all fields optional?
- PATCH = partial update
- Want to change only author → send
{"author": "New"} - Want to change only content → send
{"content": "New"}
Without schema separation:
# ❌ Bad — one schema for everything
await client.post("/quotes/", json={}) # Creates empty quote!3. QuoteResponse (Output for GET)
class QuoteResponse(QuoteBase):
id: int # ✅ Added ID (generated by DB)
created_at: datetime # ✅ Added timestampWhy separate schema for response?
- Client doesn't send
idwhen creating (DB generates it) - Client doesn't send
created_at(DB sets it) - But in response we must return these fields
model_config = ConfigDict(from_attributes=True) — Pydantic magic:
# Without from_attributes:
db_quote = Quote(id=1, content="Test") # SQLAlchemy object
return QuoteResponse(**db_quote) # ❌ Error!
# With from_attributes:
return QuoteResponse.model_validate(db_quote) # ✅ Works!FastAPI automatically calls model_validate() when using response_model=QuoteResponse.
Step 6: CRUD Operations
app/crud.py
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Quote
from app.schemas import QuoteCreate, QuoteUpdate
async def create_quote(db: AsyncSession, quote: QuoteCreate) -> Quote:
"""Creates new quote"""
# Convert Pydantic model to SQLAlchemy model
db_quote = Quote(**quote.model_dump())
db.add(db_quote) # Add to session
await db.commit() # Save to DB
await db.refresh(db_quote) # Refresh object (get ID)
return db_quote
async def get_quote(db: AsyncSession, quote_id: int) -> Quote | None:
"""Gets quote by ID"""
result = await db.execute(select(Quote).where(Quote.id == quote_id))
return result.scalar_one_or_none() # None if not found
async def get_quotes(
db: AsyncSession,
skip: int = 0,
limit: int = 100,
author: str | None = None,
category: str | None = None,
) -> list[Quote]:
"""Gets list of quotes with filtering"""
query = select(Quote)
if author:
query = query.where(Quote.author.ilike(f"%{author}%"))
if category:
query = query.where(Quote.category == category)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
return list(result.scalars().all())
async def get_quotes_count(
db: AsyncSession,
author: str | None = None,
category: str | None = None,
) -> int:
"""Counts total quotes"""
query = select(func.count(Quote.id))
if author:
query = query.where(Quote.author.ilike(f"%{author}%"))
if category:
query = query.where(Quote.category == category)
result = await db.execute(query)
return result.scalar_one()
async def get_random_quote(db: AsyncSession) -> Quote | None:
"""Gets random quote"""
# PostgreSQL specific: ORDER BY RANDOM()
query = select(Quote).order_by(func.random()).limit(1)
result = await db.execute(query)
return result.scalar_one_or_none()
async def update_quote(
db: AsyncSession,
quote_id: int,
quote_update: QuoteUpdate,
) -> Quote | None:
"""Updates quote"""
db_quote = await get_quote(db, quote_id)
if not db_quote:
return None
update_data = quote_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_quote, field, value)
await db.commit()
await db.refresh(db_quote)
return db_quote
async def delete_quote(db: AsyncSession, quote_id: int) -> bool:
"""Deletes quote"""
db_quote = await get_quote(db, quote_id)
if not db_quote:
return False
await db.delete(db_quote)
await db.commit()
return TrueKey CRUD Concepts
1. Why quote.model_dump() Instead of quote.dict()?
Pydantic v2 changed the API:
# Pydantic v1 (old way)
quote.dict() # ❌ Deprecated
# Pydantic v2 (new way)
quote.model_dump() # ✅ Current method2. Why await db.refresh(db_quote) After Commit?
db_quote = Quote(content="Test") # id = None (not saved yet)
db.add(db_quote)
await db.commit() # DB generates id = 1
# Without refresh:
print(db_quote.id) # None ❌
# With refresh:
await db.refresh(db_quote)
print(db_quote.id) # 1 ✅3. Why scalar_one_or_none() vs first() or one()?
SQLAlchemy 2.0 style — three options:
result = await db.execute(select(Quote).where(...))
# .one() — strictly one record, else error
quote = result.scalar_one() # ❌ NoResultFound if empty
# .one_or_none() — one record or None
quote = result.scalar_one_or_none() # ✅ None if not found
# .first() — first record or None (deprecated in 2.0)
quote = result.first() # ⚠️ Better not use4. Why query.offset(skip).limit(limit)?
This is pagination (splitting into pages):
# First page: 10 quotes
get_quotes(skip=0, limit=10) # Records 1-10
# Second page: next 10
get_quotes(skip=10, limit=10) # Records 11-20
# Third page
get_quotes(skip=20, limit=10) # Records 21-30Formula: skip = (page - 1) * limit
5. Why update_data = quote_update.model_dump(exclude_unset=True)?
Without exclude_unset:
# Client sent only {"author": "New"}
quote_update = QuoteUpdate(author="New")
quote_update.model_dump()
# {"author": "New", "content": None, "category": None}
# ❌ Overwrites content and category with None!With exclude_unset=True:
quote_update.model_dump(exclude_unset=True)
# {"author": "New"}
# ✅ Updates only author, leaves rest untouchedStep 7: API Endpoints
app/routers/quotes.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app import crud, schemas
from app.database import get_db
router = APIRouter(
prefix="/quotes",
tags=["quotes"],
)
@router.post(
"/",
response_model=schemas.QuoteResponse,
status_code=status.HTTP_201_CREATED,
summary="Create quote",
)
async def create_quote(
quote: schemas.QuoteCreate,
db: AsyncSession = Depends(get_db),
):
"""Creates new quote"""
return await crud.create_quote(db, quote)
@router.get(
"/",
response_model=schemas.QuoteList,
summary="Get quote list",
)
async def list_quotes(
skip: int = Query(0, ge=0, description="Skip N records"),
limit: int = Query(10, ge=1, le=100, description="Limit records"),
author: str | None = Query(None, description="Filter by author"),
category: str | None = Query(None, description="Filter by category"),
db: AsyncSession = Depends(get_db),
):
"""Gets list of quotes with pagination and filtering"""
quotes = await crud.get_quotes(
db,
skip=skip,
limit=limit,
author=author,
category=category,
)
total = await crud.get_quotes_count(db, author=author, category=category)
return schemas.QuoteList(quotes=quotes, total=total)
@router.get(
"/random",
response_model=schemas.QuoteResponse,
summary="Get random quote",
)
async def random_quote(db: AsyncSession = Depends(get_db)):
"""Gets random quote"""
quote = await crud.get_random_quote(db)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No quotes found",
)
return quote
@router.get(
"/{quote_id}",
response_model=schemas.QuoteResponse,
summary="Get quote by ID",
)
async def get_quote(
quote_id: int,
db: AsyncSession = Depends(get_db),
):
"""Gets quote by ID"""
quote = await crud.get_quote(db, quote_id)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Quote {quote_id} not found",
)
return quote
@router.patch(
"/{quote_id}",
response_model=schemas.QuoteResponse,
summary="Update quote",
)
async def update_quote(
quote_id: int,
quote_update: schemas.QuoteUpdate,
db: AsyncSession = Depends(get_db),
):
"""Partially updates quote"""
quote = await crud.update_quote(db, quote_id, quote_update)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Quote {quote_id} not found",
)
return quote
@router.delete(
"/{quote_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete quote",
)
async def delete_quote(
quote_id: int,
db: AsyncSession = Depends(get_db),
):
"""Deletes quote"""
deleted = await crud.delete_quote(db, quote_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Quote {quote_id} not found",
)Why These Decorators?
1. response_model=schemas.QuoteResponse
@router.get("/quotes/{quote_id}", response_model=schemas.QuoteResponse)
async def get_quote(quote_id: int, db: AsyncSession = Depends(get_db)):
return await crud.get_quote(db, quote_id)What does response_model do?
- ✅ Validates response (error if function returns wrong type)
- ✅ Converts SQLAlchemy object to JSON
- ✅ Filters fields (won't return
password_hashif in model) - ✅ Generates OpenAPI schema for documentation
Without response_model (bad):
return db_quote # ❌ Returns SQLAlchemy object, JSON won't serialize!2. status_code=status.HTTP_201_CREATED
REST API semantics:
# POST (create) → 201 Created
@router.post("/", status_code=201)
# GET (read) → 200 OK (default)
@router.get("/")
# PATCH (update) → 200 OK
@router.patch("/{id}")
# DELETE (delete) → 204 No Content
@router.delete("/{id}", status_code=204)Why status.HTTP_201_CREATED instead of 201?
# Magic numbers (bad)
@router.post("/", status_code=201) # ❌ What does 201 mean?
# Constants (good)
@router.post("/", status_code=status.HTTP_201_CREATED) # ✅ Clear!3. Query() for Parameter Validation
skip: int = Query(0, ge=0, description="Skip N records")
limit: int = Query(10, ge=1, le=100, description="Limit records")Why Query() if we can just use skip: int = 0?
Without Query:
async def list_quotes(skip: int = 0, limit: int = 10):
# ❌ Client can send skip=-10, limit=999999With Query:
async def list_quotes(
skip: int = Query(0, ge=0), # ✅ >= 0
limit: int = Query(10, ge=1, le=100) # ✅ from 1 to 100
):If client sends ?limit=500, gets 422 Unprocessable Entity:
{
"detail": [
{
"loc": ["query", "limit"],
"msg": "ensure this value is less than or equal to 100",
"type": "value_error.number.not_le"
}
]
}4. Why summary and tags?
@router.get("/random", tags=["quotes"], summary="Get random quote")Generates beautiful documentation:
tags— groups endpoints in Swagger UIsummary— brief description in list- Docstring — detailed description when expanded
Key principles:
- Use
Depends(get_db)for session injection Query()for query parameter validation- HTTP status codes (201, 404, 204)
- Descriptions for auto-documentation
Step 8: Main Application File
app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import engine, Base
from app.routers import quotes
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle events: startup and shutdown"""
# Startup: create tables (only for dev!)
# In production use Alembic migrations
if settings.debug:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# Shutdown: close connections
await engine.dispose()
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description="REST API for motivational quotes",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(quotes.router)
@app.get("/", tags=["root"])
async def root():
"""Health check endpoint"""
return {
"app": settings.app_name,
"version": settings.app_version,
"status": "running",
}Why Do We Need lifespan?
Problem: Need to execute code on app startup/shutdown.
Old way (deprecated):
@app.on_event("startup")
async def startup():
# Code on startup
@app.on_event("shutdown")
async def shutdown():
# Code on shutdownNew way (recommended):
@asynccontextmanager
async def lifespan(app: FastAPI):
# ⬇️ Code BEFORE yield — startup
print("Starting up...")
yield
# ⬇️ Code AFTER yield — shutdown
print("Shutting down...")Why await engine.dispose() in shutdown?
# Close all DB connections on shutdown
await engine.dispose()Without dispose:
- PostgreSQL connections remain open
- Connection pool might exhaust on restart
- In production this is resource leak
Why CORS middleware?
Problem: Frontend on localhost:3000 can't call API on localhost:8000.
Browser blocks requests:
Access to fetch at 'http://localhost:8000/quotes'
from origin 'http://localhost:3000' has been blocked by CORS policy
Solution:
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # Allow frontend
allow_credentials=True,
allow_methods=["*"], # All HTTP methods (GET, POST, etc.)
allow_headers=["*"], # All headers
)Production setup:
# .env
ALLOWED_ORIGINS=https://myapp.com,https://www.myapp.com
# config.py
allowed_origins: list[str] = ["https://myapp.com"]⚠️ Never use allow_origins=["*"] in production!
Step 9: Migrations with Alembic
Initializing Alembic
alembic init alembicalembic/env.py
Update env.py to support async:
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.config import settings
from app.database import Base
from app.models import Quote # Import models!
# Alembic Config
config = context.config
# Set DATABASE_URL from settings
config.set_main_option("sqlalchemy.url", settings.database_url)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
import asyncio
asyncio.run(run_migrations_online())Creating First Migration
# Auto-generate migration
alembic revision --autogenerate -m "Create quotes table"
# Apply migration
alembic upgrade headStep 10: Running the Application
Local Run
# Development mode with hot reload
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000Seeding Test Data
Create scripts/seed_data.py:
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.models import Quote
quotes_data = [
{
"content": "Be yourself; everyone else is already taken.",
"author": "Oscar Wilde",
"category": "inspiration",
},
{
"content": "The only way to do great work is to love what you do.",
"author": "Steve Jobs",
"category": "motivation",
},
{
"content": "Life is what happens when you're busy making other plans.",
"author": "John Lennon",
"category": "life",
},
]
async def seed_quotes():
"""Adds test quotes to database"""
async with AsyncSessionLocal() as db:
for data in quotes_data:
quote = Quote(**data)
db.add(quote)
await db.commit()
print(f"✅ Added {len(quotes_data)} quotes")
if __name__ == "__main__":
asyncio.run(seed_quotes())Run:
python scripts/seed_data.pyCheck API
Open auto-generated documentation:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
Step 11: Testing
tests/conftest.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.database import Base, get_db
# Test database (in-memory SQLite)
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="function")
async def db_session():
"""Creates test database session"""
engine = create_async_engine(TEST_DATABASE_URL, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
TestSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with TestSessionLocal() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture(scope="function")
async def client(db_session):
"""Creates test HTTP client"""
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()tests/test_quotes.py
import pytest
@pytest.mark.asyncio
async def test_create_quote(client):
"""Test creating quote"""
response = await client.post(
"/quotes/",
json={
"content": "Test quote",
"author": "Test Author",
"category": "test",
},
)
assert response.status_code == 201
data = response.json()
assert data["content"] == "Test quote"
assert data["author"] == "Test Author"
assert "id" in data
@pytest.mark.asyncio
async def test_get_quote(client):
"""Test getting quote"""
# Create quote
create_response = await client.post(
"/quotes/",
json={"content": "Get test", "author": "Author"},
)
quote_id = create_response.json()["id"]
# Get quote
response = await client.get(f"/quotes/{quote_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == quote_id
assert data["content"] == "Get test"
@pytest.mark.asyncio
async def test_get_nonexistent_quote(client):
"""Test getting nonexistent quote"""
response = await client.get("/quotes/999")
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_list_quotes(client):
"""Test getting quote list"""
# Create several quotes
for i in range(3):
await client.post(
"/quotes/",
json={"content": f"Quote {i}", "author": f"Author {i}"},
)
# Get list
response = await client.get("/quotes/")
assert response.status_code == 200
data = response.json()
assert data["total"] == 3
assert len(data["quotes"]) == 3
@pytest.mark.asyncio
async def test_filter_quotes_by_author(client):
"""Test filtering by author"""
await client.post(
"/quotes/",
json={"content": "Quote 1", "author": "Oscar Wilde"},
)
await client.post(
"/quotes/",
json={"content": "Quote 2", "author": "Steve Jobs"},
)
response = await client.get("/quotes/?author=Oscar")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["quotes"][0]["author"] == "Oscar Wilde"
@pytest.mark.asyncio
async def test_update_quote(client):
"""Test updating quote"""
# Create quote
create_response = await client.post(
"/quotes/",
json={"content": "Original", "author": "Author"},
)
quote_id = create_response.json()["id"]
# Update
response = await client.patch(
f"/quotes/{quote_id}",
json={"content": "Updated"},
)
assert response.status_code == 200
data = response.json()
assert data["content"] == "Updated"
assert data["author"] == "Author" # Unchanged
@pytest.mark.asyncio
async def test_delete_quote(client):
"""Test deleting quote"""
# Create quote
create_response = await client.post(
"/quotes/",
json={"content": "Delete me", "author": "Author"},
)
quote_id = create_response.json()["id"]
# Delete
response = await client.delete(f"/quotes/{quote_id}")
assert response.status_code == 204
# Verify deleted
get_response = await client.get(f"/quotes/{quote_id}")
assert get_response.status_code == 404
@pytest.mark.asyncio
async def test_random_quote(client):
"""Test getting random quote"""
# Create quote
await client.post(
"/quotes/",
json={"content": "Random quote", "author": "Author"},
)
# Get random
response = await client.get("/quotes/random")
assert response.status_code == 200
data = response.json()
assert "content" in data
assert "author" in dataRunning Tests
# Install SQLite dependency
pip install aiosqlite
# Run tests
pytest
# With coverage
pytest --cov=app --cov-report=term-missingProduction Deployment
Docker
Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Application code
COPY app ./app
COPY alembic ./alembic
COPY alembic.ini .
# Run migrations and server
CMD alembic upgrade head && \
uvicorn app.main:app --host 0.0.0.0 --port 8000docker-compose.yml:
version: "3.8"
services:
db:
image: postgres:15
environment:
POSTGRES_DB: quotes_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
api:
build: .
environment:
DATABASE_URL: postgresql://postgres:password@db:5432/quotes_db
APP_NAME: "Quotes API"
DEBUG: "False"
ports:
- "8000:8000"
depends_on:
- db
volumes:
postgres_data:Run:
docker-compose up --buildProduction Settings
Recommendations for production:
- Disable debug mode:
DEBUG=Falsein.env - Use Alembic for migrations (not
create_all()) - Configure connection pool:
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
)- Add rate limiting (e.g.,
slowapi) - Configure logging:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)- Use environment-specific configs:
class Settings(BaseSettings):
environment: str = "development"
@property
def is_production(self) -> bool:
return self.environment == "production"Improvements and Extensions
1. Authentication (JWT)
pip install python-jose[cryptography] passlib[bcrypt]Add User model, /auth/register, /auth/login endpoints.
2. Caching (Redis)
pip install redis[hiredis]Cache /quotes/random for speedup:
@router.get("/random")
async def random_quote(
db: AsyncSession = Depends(get_db),
cache: Redis = Depends(get_redis),
):
cached = await cache.get("random_quote")
if cached:
return json.loads(cached)
quote = await crud.get_random_quote(db)
await cache.setex("random_quote", 60, json.dumps(quote))
return quote3. Full-Text Search
Add endpoint /quotes/search?q=motivation:
from sqlalchemy import or_
async def search_quotes(db: AsyncSession, query: str) -> list[Quote]:
result = await db.execute(
select(Quote).where(
or_(
Quote.content.ilike(f"%{query}%"),
Quote.author.ilike(f"%{query}%"),
)
)
)
return list(result.scalars().all())4. Background Tasks (Celery)
For sending daily quote emails:
pip install celery[redis]from celery import Celery
celery = Celery("tasks", broker="redis://localhost:6379")
@celery.task
def send_daily_quote():
# Get random quote and send email
pass5. Metrics and Monitoring
pip install prometheus-fastapi-instrumentatorfrom prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
Instrumentator().instrument(app).expose(app)Metrics available at /metrics.
Summary
You've built a production-ready REST API with:
- FastAPI — modern async framework
- SQLAlchemy 2.0 — typed ORM with async/await
- Pydantic v2 — data validation
- Alembic — database migrations
- pytest — test coverage
- OpenAPI — auto-documentation
- CORS — frontend ready
- Docker — containerization
Next Steps:
- Add JWT authentication
- Implement Redis caching
- Set up CI/CD (GitHub Actions, GitLab CI)
- Add monitoring (Prometheus + Grafana)
- Deploy to production (Railway, Fly.io, DigitalOcean)
Useful Links:
Keep learning FastAPI — it's a powerful tool for building fast and reliable APIs!
