Implement FastAPI authentication with JWT and OAuth2 for secure API development

Intermediate 45 min Apr 06, 2026 247 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Build secure FastAPI applications with JWT token authentication and OAuth2 password bearer flows. Learn to implement role-based access control and protect API endpoints with production-ready security patterns.

Prerequisites

  • Basic Python knowledge
  • Understanding of REST APIs
  • Familiarity with HTTP authentication

What this solves

FastAPI applications need robust authentication to protect API endpoints from unauthorized access. This tutorial implements JWT (JSON Web Token) authentication with OAuth2 password bearer flows, providing secure user login, token generation, and role-based access control for production APIs.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you have the latest packages available.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install Python 3.12 and development tools

Install Python 3.12 along with pip and virtual environment tools for FastAPI development.

sudo apt install -y python3.12 python3.12-venv python3.12-dev python3-pip build-essential
sudo dnf install -y python3.12 python3.12-venv python3.12-devel python3-pip gcc gcc-c++ make

Create project directory and virtual environment

Set up a dedicated directory for your FastAPI authentication project with an isolated Python environment.

mkdir fastapi-auth && cd fastapi-auth
python3.12 -m venv fastapi-env
source fastapi-env/bin/activate

Install FastAPI and authentication dependencies

Install FastAPI, Uvicorn ASGI server, and the required packages for JWT authentication and password hashing.

pip install fastapi[all] uvicorn[standard] python-jose[cryptography] passlib[bcrypt] python-multipart

Create the main application structure

Set up the basic directory structure for your FastAPI authentication application.

mkdir -p app/{auth,models,routers,utils}
touch app/__init__.py app/main.py app/auth/__init__.py app/models/__init__.py app/routers/__init__.py app/utils/__init__.py

Configure environment variables

Create a secure configuration file for JWT secrets and application settings.

SECRET_KEY=your-256-bit-secret-key-here-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
Security Warning: Generate a strong SECRET_KEY using openssl rand -hex 32 and never commit it to version control.

Create user models and database schemas

Define Pydantic models for user authentication, token handling, and database schemas.

from pydantic import BaseModel, EmailStr
from typing import Optional, List
from enum import Enum

class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"

class UserBase(BaseModel):
    email: EmailStr
    username: str
    full_name: Optional[str] = None
    roles: List[UserRole] = [UserRole.USER]
    is_active: bool = True

class UserCreate(UserBase):
    password: str

class UserInDB(UserBase):
    id: int
    hashed_password: str

class User(UserBase):
    id: int
    
    class Config:
        from_attributes = True

Create token models

Define Pydantic models for JWT tokens and token data validation.

from pydantic import BaseModel
from typing import Optional, List
from app.models.user import UserRole

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class TokenData(BaseModel):
    username: Optional[str] = None
    user_id: Optional[int] = None
    roles: List[UserRole] = []
    expires_at: Optional[float] = None

Implement password hashing utilities

Create secure password hashing and verification functions using bcrypt.

from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Union, Any
from jose import jwt, JWTError
import os
from dotenv import load_dotenv

load_dotenv()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", 7))

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify a plain password against its hash."""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """Generate password hash."""
    return pwd_context.hash(password)

def create_access_token(
    data: dict, expires_delta: Union[timedelta, None] = None
) -> str:
    """Create JWT access token."""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({"exp": expire, "type": "access"})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def create_refresh_token(data: dict) -> str:
    """Create JWT refresh token."""
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode.update({"exp": expire, "type": "refresh"})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str, token_type: str = "access") -> Union[dict, None]:
    """Verify and decode JWT token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != token_type:
            return None
        return payload
    except JWTError:
        return None

Create user database and authentication functions

Implement a simple in-memory user database and authentication functions for demonstration purposes.

from typing import Dict, Optional, List
from app.models.user import UserInDB, UserCreate, UserRole
from app.utils.security import get_password_hash

In-memory user database (replace with real database in production)

users_db: Dict[str, UserInDB] = { "admin@example.com": UserInDB( id=1, email="admin@example.com", username="admin", full_name="Admin User", roles=[UserRole.ADMIN], hashed_password=get_password_hash("admin123"), is_active=True ), "user@example.com": UserInDB( id=2, email="user@example.com", username="testuser", full_name="Test User", roles=[UserRole.USER], hashed_password=get_password_hash("user123"), is_active=True ) } def get_user_by_email(email: str) -> Optional[UserInDB]: """Get user by email address.""" return users_db.get(email) def get_user_by_username(username: str) -> Optional[UserInDB]: """Get user by username.""" for user in users_db.values(): if user.username == username: return user return None def create_user(user: UserCreate) -> UserInDB: """Create new user in database.""" hashed_password = get_password_hash(user.password) user_id = len(users_db) + 1 db_user = UserInDB( id=user_id, email=user.email, username=user.username, full_name=user.full_name, roles=user.roles, hashed_password=hashed_password, is_active=user.is_active ) users_db[user.email] = db_user return db_user

Implement OAuth2 authentication scheme

Create OAuth2 password bearer authentication with JWT token validation and user dependency injection.

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from app.utils.security import verify_password, verify_token
from app.auth.database import get_user_by_email, get_user_by_username
from app.models.user import User, UserInDB, UserRole
from app.models.token import TokenData
from typing import Optional, List

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")

def authenticate_user(username: str, password: str) -> Optional[UserInDB]:
    """Authenticate user with username/email and password."""
    user = get_user_by_username(username) or get_user_by_email(username)
    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    """Get current user from JWT token."""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    payload = verify_token(token, "access")
    if payload is None:
        raise credentials_exception
    
    username: str = payload.get("sub")
    if username is None:
        raise credentials_exception
    
    user = get_user_by_username(username) or get_user_by_email(username)
    if user is None:
        raise credentials_exception
    
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Inactive user"
        )
    
    return User(
        id=user.id,
        email=user.email,
        username=user.username,
        full_name=user.full_name,
        roles=user.roles,
        is_active=user.is_active
    )

def require_roles(allowed_roles: List[UserRole]):
    """Dependency to check if user has required roles."""
    def role_checker(current_user: User = Depends(get_current_user)) -> User:
        if not any(role in current_user.roles for role in allowed_roles):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions"
            )
        return current_user
    return role_checker

Create authentication routes

Implement login, registration, token refresh, and user management endpoints.

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from app.models.user import User, UserCreate, UserRole
from app.models.token import Token
from app.auth.oauth2 import authenticate_user, get_current_user, require_roles
from app.auth.database import create_user, get_user_by_email, get_user_by_username
from app.utils.security import create_access_token, create_refresh_token, verify_token
from datetime import timedelta

router = APIRouter(prefix="/auth", tags=["authentication"])

@router.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """Login endpoint that returns JWT tokens."""
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    access_token = create_access_token(
        data={"sub": user.username, "user_id": user.id, "roles": [role.value for role in user.roles]}
    )
    refresh_token = create_refresh_token(
        data={"sub": user.username, "user_id": user.id}
    )
    
    return Token(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer"
    )

@router.post("/register", response_model=User)
async def register_user(user: UserCreate):
    """Register new user endpoint."""
    if get_user_by_email(user.email):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered"
        )
    
    if get_user_by_username(user.username):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already taken"
        )
    
    db_user = create_user(user)
    return User(
        id=db_user.id,
        email=db_user.email,
        username=db_user.username,
        full_name=db_user.full_name,
        roles=db_user.roles,
        is_active=db_user.is_active
    )

@router.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
    """Refresh access token using refresh token."""
    payload = verify_token(refresh_token, "refresh")
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token"
        )
    
    username = payload.get("sub")
    user = get_user_by_username(username) or get_user_by_email(username)
    
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found or inactive"
        )
    
    access_token = create_access_token(
        data={"sub": user.username, "user_id": user.id, "roles": [role.value for role in user.roles]}
    )
    new_refresh_token = create_refresh_token(
        data={"sub": user.username, "user_id": user.id}
    )
    
    return Token(
        access_token=access_token,
        refresh_token=new_refresh_token,
        token_type="bearer"
    )

@router.get("/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    """Get current user information."""
    return current_user

Create protected API routes with role-based access

Implement example API endpoints with different access levels based on user roles.

from fastapi import APIRouter, Depends
from app.models.user import User, UserRole
from app.auth.oauth2 import get_current_user, require_roles
from typing import List

router = APIRouter(prefix="/api", tags=["protected"])

@router.get("/public")
async def public_endpoint():
    """Public endpoint that doesn't require authentication."""
    return {"message": "This is a public endpoint"}

@router.get("/private")
async def private_endpoint(current_user: User = Depends(get_current_user)):
    """Private endpoint that requires authentication."""
    return {
        "message": f"Hello {current_user.full_name or current_user.username}!",
        "user_id": current_user.id,
        "roles": current_user.roles
    }

@router.get("/admin")
async def admin_endpoint(current_user: User = Depends(require_roles([UserRole.ADMIN]))):
    """Admin-only endpoint that requires admin role."""
    return {
        "message": "Admin access granted",
        "admin_user": current_user.username,
        "sensitive_data": "This is admin-only information"
    }

@router.get("/moderator")
async def moderator_endpoint(
    current_user: User = Depends(require_roles([UserRole.ADMIN, UserRole.MODERATOR]))
):
    """Moderator endpoint that requires admin or moderator role."""
    return {
        "message": "Moderator access granted",
        "user": current_user.username,
        "roles": current_user.roles,
        "moderation_tools": ["ban_user", "delete_post", "edit_content"]
    }

@router.get("/users")
async def list_users(current_user: User = Depends(require_roles([UserRole.ADMIN]))):
    """Admin endpoint to list all users."""
    from app.auth.database import users_db
    
    users = [
        {
            "id": user.id,
            "username": user.username,
            "email": user.email,
            "full_name": user.full_name,
            "roles": user.roles,
            "is_active": user.is_active
        }
        for user in users_db.values()
    ]
    
    return {"users": users, "total": len(users)}

Create the main FastAPI application

Set up the main application with CORS, security headers, and route registration.

from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from app.routers import auth, protected
import time

app = FastAPI(
    title="FastAPI Authentication Demo",
    description="JWT and OAuth2 authentication with role-based access control",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc"
)

Security middleware

app.add_middleware( TrustedHostMiddleware, allowed_hosts=["localhost", "127.0.0.1", "*.example.com"] ) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000", "https://example.com"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["*"], ) @app.middleware("http") async def add_security_headers(request: Request, call_next): """Add security headers to all responses.""" response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" return response @app.middleware("http") async def add_process_time_header(request: Request, call_next): """Add processing time header.""" start_time = time.time() response = await call_next(request) process_time = time.time() - start_time response.headers["X-Process-Time"] = str(process_time) return response

Include routers

app.include_router(auth.router) app.include_router(protected.router) @app.get("/") async def root(): """Root endpoint with API information.""" return { "message": "FastAPI Authentication API", "version": "1.0.0", "docs": "/docs", "authentication": "JWT Bearer tokens", "endpoints": { "login": "/auth/login", "register": "/auth/register", "refresh": "/auth/refresh", "profile": "/auth/me" } } @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "timestamp": time.time()} if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="127.0.0.1", port=8000, reload=True, log_level="info" )

Install python-dotenv for environment variables

Install the python-dotenv package to load environment variables from the .env file.

pip install python-dotenv

Generate a secure secret key

Create a cryptographically secure secret key for JWT token signing.

openssl rand -hex 32

Copy the generated key and update your .env file with this secure value.

Start the FastAPI development server

Launch the application using Uvicorn ASGI server with hot reloading enabled.

cd ~/fastapi-auth
source fastapi-env/bin/activate
uvicorn app.main:app --reload --host 127.0.0.1 --port 8000

Configure JWT token generation and validation

Test JWT token generation

Create a simple test script to verify JWT token creation and validation works correctly.

from app.utils.security import create_access_token, verify_token
from datetime import timedelta

Test data

test_data = {"sub": "testuser", "user_id": 1, "roles": ["user"]}

Create token

token = create_access_token(data=test_data, expires_delta=timedelta(minutes=30)) print(f"Generated token: {token[:50]}...")

Verify token

payload = verify_token(token, "access") if payload: print(f"Token valid! User: {payload.get('sub')}") print(f"Roles: {payload.get('roles')}") print(f"Expires: {payload.get('exp')}") else: print("Token invalid!")
python test_jwt.py

Configure token expiration settings

Customize token lifetimes based on your security requirements.

# Short-lived access tokens for better security
ACCESS_TOKEN_EXPIRE_MINUTES=15

Longer-lived refresh tokens

REFRESH_TOKEN_EXPIRE_DAYS=7

For development, you might use longer access tokens

ACCESS_TOKEN_EXPIRE_MINUTES=60

Implement role-based access control

Create additional user roles

Extend the role system by adding more user roles to the enum and database.

# Add to existing UserRole enum
class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"
    EDITOR = "editor"
    VIEWER = "viewer"

Create role hierarchy checking

Implement role hierarchy where higher roles inherit permissions from lower roles.

from app.models.user import UserRole
from typing import List

Define role hierarchy (higher roles inherit lower role permissions)

ROLE_HIERARCHY = { UserRole.VIEWER: 1, UserRole.USER: 2, UserRole.EDITOR: 3, UserRole.MODERATOR: 4, UserRole.ADMIN: 5 } def has_permission(user_roles: List[UserRole], required_role: UserRole) -> bool: """Check if user has permission based on role hierarchy.""" user_max_level = max([ROLE_HIERARCHY.get(role, 0) for role in user_roles], default=0) required_level = ROLE_HIERARCHY.get(required_role, 0) return user_max_level >= required_level def require_permission(required_role: UserRole): """Dependency to check role hierarchy permissions.""" def permission_checker(current_user = Depends(get_current_user)): if not has_permission(current_user.roles, required_role): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Requires {required_role.value} permission or higher" ) return current_user return permission_checker

Verify your setup

Test your FastAPI authentication system using curl commands or the interactive API documentation.

# Check if the server is running
curl http://localhost:8000/health

Test user registration

curl -X POST "http://localhost:8000/auth/register" \ -H "Content-Type: application/json" \ -d '{ "email": "newuser@example.com", "username": "newuser", "password": "securepass123", "full_name": "New User" }'

Test login and get tokens

curl -X POST "http://localhost:8000/auth/login" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "username=admin@example.com&password=admin123"

Test protected endpoint (replace TOKEN with actual token from login)

curl -X GET "http://localhost:8000/api/private" \ -H "Authorization: Bearer TOKEN"

Test admin endpoint

curl -X GET "http://localhost:8000/api/admin" \ -H "Authorization: Bearer TOKEN"

Check API documentation

echo "Visit http://localhost:8000/docs for interactive API documentation"
Note: The interactive API documentation at /docs provides a convenient way to test all endpoints with built-in authentication support.

Create production deployment script

Create systemd service file

Set up a systemd service for production deployment with proper user permissions and security.

[Unit]
Description=FastAPI Authentication API
After=network.target

[Service]
Type=simple
User=fastapi
Group=fastapi
WorkingDirectory=/opt/fastapi-auth
Environment=PATH=/opt/fastapi-auth/fastapi-env/bin
ExecStart=/opt/fastapi-auth/fastapi-env/bin/uvicorn app.main:app --host 127.0.0.1 --port 8000 --workers 4
Restart=always
RestartSec=3

Security settings

NoNewPrivileges=yes PrivateTmp=yes ProtectSystem=strict ProtectHome=yes ReadWritePaths=/opt/fastapi-auth/logs [Install] WantedBy=multi-user.target

Create dedicated user and set permissions

Create a dedicated system user for running the FastAPI application securely.

# Create system user
sudo useradd --system --shell /bin/false --home-dir /opt/fastapi-auth fastapi

Copy application to production directory

sudo cp -r ~/fastapi-auth /opt/ sudo chown -R fastapi:fastapi /opt/fastapi-auth sudo chmod -R 755 /opt/fastapi-auth sudo chmod 600 /opt/fastapi-auth/.env

Create logs directory

sudo mkdir -p /opt/fastapi-auth/logs sudo chown fastapi:fastapi /opt/fastapi-auth/logs
Never use chmod 777. It gives every user on the system full access to your files. Instead, use specific ownership with chown and minimal permissions like 755 for directories and 644 for files.

Enable and start the service

Enable the systemd service and configure it to start automatically on boot.

sudo systemctl daemon-reload
sudo systemctl enable fastapi-auth.service
sudo systemctl start fastapi-auth.service
sudo systemctl status fastapi-auth.service

Common issues

SymptomCauseFix
"Could not validate credentials" error Invalid JWT token or expired token Check token expiration, verify SECRET_KEY matches, ensure proper Bearer format
"Inactive user" error User account is disabled Set user.is_active = True in database or user model
"Insufficient permissions" error User lacks required role Check user roles in database, verify require_roles dependency usage
Import errors when starting Missing dependencies or virtual environment Activate virtual environment, run pip install -r requirements.txt
"Invalid refresh token" error Refresh token expired or malformed Generate new refresh token through login, check token type in payload
CORS errors in frontend Origin not allowed in CORS middleware Add frontend origin to allow_origins list in CORSMiddleware
Password verification fails Incorrect bcrypt configuration Verify passlib configuration, check password hashing in user creation

Next steps

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.