WeniVooks

검색

FastAPI 베이스캠프

JWT를 이용한 사용자 인증 구현

1. 라우팅 및 세팅

1.1 URL 정보

이번 챕터의 URL 구성은 아래와 같습니다.

경로 함수명 메서드 설명
/token login_for_access_token POST 사용자 로그인 및 액세스 토큰 발급
/users/me/ read_users_me GET 현재 인증된 사용자 정보 조회
/users/ create_user POST 새로운 사용자 등록
1.2 기본 세팅

이번 실습 폴더는 04_3_jwt_auth입니다. VSC 터미널에서 사용할 명령어 입니다. 가상환경은 벗어난 상태에서 실행해야 합니다. 만약 터미널 입력창 앞에 (venv)라고 되어 있다면 deactivate 명령어로 가상환경을 나간 상태에서 cd ..으로 상위 폴더로 나와 아래 명령어를 실행해주세요.

mkdir 04_3_jwt_auth
cd 04_3_jwt_auth
python -m venv venv
.\venv\Scripts\activate
pip install fastapi[all] python-jose[cryptography] passlib[bcrypt]
mkdir 04_3_jwt_auth
cd 04_3_jwt_auth
python -m venv venv
.\venv\Scripts\activate
pip install fastapi[all] python-jose[cryptography] passlib[bcrypt]

2. JWT 구현 방식과 기본 세팅

JWT를 구현하기 위해 DB를 사용해야 하지만, 이번 실습에서는 DB 대신 메모리 내 파이썬 데이터 구조를 사용하여 데이터를 저장할 것입니다. 또한 복잡도를 낮추기 위해 리프레시 토큰을 구현하지 않습니다. 이는 과제로 남겨두었습니다. 이는 개념을 간단히 설명하기 위한 것이며, 실제 애플리케이션에서는 보통 앞서 학습한 데이터베이스를 사용합니다.

또 JWT를 구현하기 위해는 User가 있어야 합니다. User는 일반 테이블보다 고려해야 할 사항이 많습니다. 예를 들어, 패스워드를 저장할 때는 해싱을 해야 합니다. 관리자가 패스워드를 DB에서 확인하더라도, 어떤 패스워드인지 알 수 없게 해야 하기 때문입니다.

이번 실습에서는 간단한 User를 구현하고, JWT를 이용하여 사용자 인증을 구현해보겠습니다.

3. 코드 설명

코드에 대한 주요 개념을 먼저 설명하고, 코드 구현은 아래 섹션에서 진행하겠습니다. 헷갈리고 생소한 개념이 많을 수 있으니 천천히, 반복해서 읽어보세요. 주석은 코드 설명 챕터에만 있고, 코드 구현 챕터에는 없습니다.

3.1 JWT 설정
SECRET_KEY = "your-secret-key"  # 실제 환경에서는 안전한 방식으로 관리해야 합니다
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
SECRET_KEY = "your-secret-key"  # 실제 환경에서는 안전한 방식으로 관리해야 합니다
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
  • SECRET_KEY: JWT 서명에 사용되는 비밀 키입니다. 실제 환경에서는 안전하게 관리해야 합니다. 보통은 별도의 파일로 관리를 합니다.
  • ALGORITHM: JWT 암호화에 사용되는 알고리즘입니다. 선택할 수 있는 알고리즘은 여러 가지가 있습니다. 대표적으로 HS256, RS256, ES256 등이 있습니다. 모두 단방향 암호화 방식입니다.
  • ACCESS_TOKEN_EXPIRE_MINUTES: 액세스 토큰의 유효 기간(분)입니다. 만료되면 재발급 받거나 다시 로그인해야 합니다.
3.2 사용자 모델
class UserCreate(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
    password: str
 
 
class User(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
 
 
class UserInDB(User):
    hashed_password: str
 
 
class Token(BaseModel):
    access_token: str
    token_type: str
 
 
class TokenData(BaseModel):
    username: str | None = None
class UserCreate(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
    password: str
 
 
class User(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
 
 
class UserInDB(User):
    hashed_password: str
 
 
class Token(BaseModel):
    access_token: str
    token_type: str
 
 
class TokenData(BaseModel):
    username: str | None = None
  • UserCreate: 사용자 생성 시 사용되는 모델로, 비밀번호를 포함합니다.
  • User: 기본 사용자 정보를 담는 모델입니다.
  • UserInDB: 데이터베이스에 저장될 사용자 정보 모델입니다 (해시된 비밀번호 포함).
  • Token: JWT 토큰 정보를 담는 모델입니다.
  • TokenData: 토큰에서 추출된 데이터를 담는 모델입니다.

UserCreaet과 User는 왜 구분되어 있고, UserInDB는 왜 따로 있을까요? 합칠 수는 없었을까요? User에는 민감한 정보가 있습니다. 물론 응답값에서 이 값을 제외할 수 있지만 노출될 위험을 사전에 방지할 수 있으며 용도를 명확히 할 수 있기에 분리하였습니다. 만약 더 단순화 하고 싶다면 위 3개의 모델을 User 하나로 합칠 수도 있습니다.

3.3 주요 함수
def verify_password(plain_password, hashed_password):
    """
    평문 비밀번호와 해시된 비밀번호를 비교합니다.
    
    Args:
        plain_password (str): 사용자가 입력한 평문 비밀번호
        hashed_password (str): 데이터베이스에 저장된 해시된 비밀번호
    
    Returns:
        bool: 비밀번호가 일치하면 True, 그렇지 않으면 False
    """
    return pwd_context.verify(plain_password, hashed_password)
 
def get_password_hash(password):
    """
    비밀번호를 안전하게 해시합니다.
    
    Args:
        password (str): 해시할 평문 비밀번호
    
    Returns:
        str: 해시된 비밀번호
    """
    return pwd_context.hash(password)
 
def get_user(db, username: str):
    """
    사용자 이름으로 사용자 정보를 조회합니다.
    
    Args:
        db (dict): 사용자 정보가 저장된 데이터베이스(이 예제에서는 딕셔너리)
        username (str): 조회할 사용자의 사용자 이름
    
    Returns:
        UserInDB: 찾은 사용자 정보. 사용자가 없으면 None
    """
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
 
def authenticate_user(fake_db, username: str, password: str):
    """
    사용자 인증을 수행합니다.
    
    Args:
        fake_db (dict): 사용자 정보가 저장된 가상 데이터베이스
        username (str): 인증할 사용자의 사용자 이름
        password (str): 인증할 사용자의 비밀번호
    
    Returns:
        UserInDB: 인증 성공 시 사용자 정보, 실패 시 False
    """
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user
 
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """
    JWT 액세스 토큰을 생성합니다.
    
    Args:
        data (dict): 토큰에 인코딩할 데이터
        expires_delta (timedelta, optional): 토큰의 만료 시간. 기본값은 15분
    
    Returns:
        str: 생성된 JWT 액세스 토큰
    """
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    """
    현재 인증된 사용자 정보를 반환합니다.
    
    이 함수는 FastAPI의 의존성 주입 시스템을 사용하여 요청 헤더에서 
    JWT 토큰을 추출하고, 이를 검증하여 현재 인증된 사용자를 식별합니다.
 
    Args:
        token (str): OAuth2PasswordBearer에 의해 추출된 JWT 토큰
 
    Returns:
        UserInDB: 현재 인증된 사용자의 정보
 
    Raises:
        HTTPException: 토큰이 유효하지 않거나 사용자를 찾을 수 없는 경우
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
def verify_password(plain_password, hashed_password):
    """
    평문 비밀번호와 해시된 비밀번호를 비교합니다.
    
    Args:
        plain_password (str): 사용자가 입력한 평문 비밀번호
        hashed_password (str): 데이터베이스에 저장된 해시된 비밀번호
    
    Returns:
        bool: 비밀번호가 일치하면 True, 그렇지 않으면 False
    """
    return pwd_context.verify(plain_password, hashed_password)
 
def get_password_hash(password):
    """
    비밀번호를 안전하게 해시합니다.
    
    Args:
        password (str): 해시할 평문 비밀번호
    
    Returns:
        str: 해시된 비밀번호
    """
    return pwd_context.hash(password)
 
def get_user(db, username: str):
    """
    사용자 이름으로 사용자 정보를 조회합니다.
    
    Args:
        db (dict): 사용자 정보가 저장된 데이터베이스(이 예제에서는 딕셔너리)
        username (str): 조회할 사용자의 사용자 이름
    
    Returns:
        UserInDB: 찾은 사용자 정보. 사용자가 없으면 None
    """
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
 
def authenticate_user(fake_db, username: str, password: str):
    """
    사용자 인증을 수행합니다.
    
    Args:
        fake_db (dict): 사용자 정보가 저장된 가상 데이터베이스
        username (str): 인증할 사용자의 사용자 이름
        password (str): 인증할 사용자의 비밀번호
    
    Returns:
        UserInDB: 인증 성공 시 사용자 정보, 실패 시 False
    """
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user
 
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """
    JWT 액세스 토큰을 생성합니다.
    
    Args:
        data (dict): 토큰에 인코딩할 데이터
        expires_delta (timedelta, optional): 토큰의 만료 시간. 기본값은 15분
    
    Returns:
        str: 생성된 JWT 액세스 토큰
    """
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    """
    현재 인증된 사용자 정보를 반환합니다.
    
    이 함수는 FastAPI의 의존성 주입 시스템을 사용하여 요청 헤더에서 
    JWT 토큰을 추출하고, 이를 검증하여 현재 인증된 사용자를 식별합니다.
 
    Args:
        token (str): OAuth2PasswordBearer에 의해 추출된 JWT 토큰
 
    Returns:
        UserInDB: 현재 인증된 사용자의 정보
 
    Raises:
        HTTPException: 토큰이 유효하지 않거나 사용자를 찾을 수 없는 경우
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

OAuth2PasswordBearer는 FastAPI의 보안 유틸리티로, OAuth2 비밀번호 흐름을 사용하여 토큰을 처리합니다.

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  • tokenUrl="token": 토큰을 발급받을 수 있는 엔드포인트를 지정합니다. 여기서는 "/token" 엔드포인트를 사용합니다.
  • 이 객체는 요청 헤더에서 "Authorization" 필드를 확인하여 Bearer 토큰을 추출합니다.
  • 토큰이 없거나 형식이 올바르지 않으면 자동으로 401 Unauthorized 응답을 반환합니다.

get_current_user 함수는 이 oauth2_scheme을 사용하여 다음과 같은 작업을 수행합니다.

  1. 요청 헤더에서 JWT 토큰을 추출합니다.
  2. 추출된 토큰을 디코딩하고 유효성을 검사합니다.
  3. 토큰에서 사용자 이름을 추출하고, 이를 사용해 데이터베이스에서 사용자 정보를 조회합니다.
  4. 모든 검증 단계를 통과하면 현재 인증된 사용자 정보를 반환합니다.

이 함수를 다른 엔드포인트에서 사용하면, 해당 엔드포인트는 자동으로 인증된 사용자만 접근할 수 있게 됩니다.

추가로 토큰을 Bearer에 담아 보내는 JavaScript 코드는 다음과 같습니다. 프론트엔드에서는 토큰을 받아 이를 브라우저에 저장하거나 쿠키에 저장하여 사용자 인증을 유지하고, 인증이 필요한 요청은 아래와 같이 토큰을 요청 헤더에 담아 보내야 합니다.

fetch('http://localhost:8000/sample/', {
  method: 'GET', // or 'POST', 'PUT', 'DELETE'
  headers: {
    'Authorization': 'Bearer ' + token
  }
})
fetch('http://localhost:8000/sample/', {
  method: 'GET', // or 'POST', 'PUT', 'DELETE'
  headers: {
    'Authorization': 'Bearer ' + token
  }
})
3.4 엔드포인트
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
 
 
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
 
 
@app.post("/users/", response_model=User)
async def create_user(user: UserCreate):
    if user.username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(user.password)
    user_dict = user.model_dump()
    del user_dict["password"]
    user_dict["hashed_password"] = hashed_password
    fake_users_db[user.username] = user_dict
    return User(**user_dict)
 
@app.get("/")
async def read_db():
    return fake_users_db
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
 
 
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
 
 
@app.post("/users/", response_model=User)
async def create_user(user: UserCreate):
    if user.username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(user.password)
    user_dict = user.model_dump()
    del user_dict["password"]
    user_dict["hashed_password"] = hashed_password
    fake_users_db[user.username] = user_dict
    return User(**user_dict)
 
@app.get("/")
async def read_db():
    return fake_users_db
  • /token (POST): 사용자 로그인 및 토큰 발급
  • /users/me/ (GET): 현재 인증된 사용자 정보 조회
  • /users/ (POST): 새 사용자 생성
  • / (GET): DB 조회

실제 애플리케이션에서는 /로 데이터를 조회하진 않습니다. 이는 실습을 위한 코드입니다.

3.5 API 테스트 함수 연결

아래 그림과 같이 함수들이 연결됩니다. 테스트까지 한 다음 다시 올라와서 확인해주세요.

API 테스트 함수 연결

머메이드 코드로 표현하면 다음과 같습니다. 라이브 머메이드에서 확인해주세요.

graph TD
    A[Start] --> B[Create User]
    B --> |POST /users/| C[create_user function]
    C --> |Calls| D[get_password_hash]
    C --> E[Store in fake_users_db]
    
    F[Login] --> |POST /token| G[login_for_access_token function]
    G --> |Calls| H[authenticate_user]
    H --> |Calls| I[get_user]
    H --> |Calls| J[verify_password]
    G --> |Calls| K[create_access_token]
    
    L[Get User Info] --> |GET /users/me/| M[read_users_me function]
    M --> |Depends on| N[get_current_user]
    N --> |Calls| O[jwt.decode]
    N --> |Calls| P[get_user]
graph TD
    A[Start] --> B[Create User]
    B --> |POST /users/| C[create_user function]
    C --> |Calls| D[get_password_hash]
    C --> E[Store in fake_users_db]
    
    F[Login] --> |POST /token| G[login_for_access_token function]
    G --> |Calls| H[authenticate_user]
    H --> |Calls| I[get_user]
    H --> |Calls| J[verify_password]
    G --> |Calls| K[create_access_token]
    
    L[Get User Info] --> |GET /users/me/| M[read_users_me function]
    M --> |Depends on| N[get_current_user]
    N --> |Calls| O[jwt.decode]
    N --> |Calls| P[get_user]

4. main.py 작성

main.py 파일에 아래 코드를 작성해주세요.

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
 
# JWT 설정
SECRET_KEY = "your-secret-key"  # 실제 환경에서는 안전한 방식으로 관리해야 합니다
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
 
app = FastAPI()
 
# 비밀번호 암호화 설정
# 아래 코드는 비밀번호를 해싱하는 방법을 정의합니다.
# pwd_context는 비밀번호를 검증하고 해싱하는 데 사용됩니다.
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
# OAuth2 설정
# OAuth2PasswordBearer는 토큰을 받아서 사용자를 인증하는 방법을 정의합니다.
# 아래 코드는 OAuth2PasswordBearer를 사용하여 토큰을 받아서 사용자를 인증합니다.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
fake_users_db = {}
 
 
class UserCreate(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
    password: str
 
 
class User(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
 
 
class UserInDB(User):
    hashed_password: str
 
 
class Token(BaseModel):
    access_token: str
    token_type: str
 
 
class TokenData(BaseModel):
    username: str | None = None
 
 
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
 
 
def get_password_hash(password):
    return pwd_context.hash(password)
 
 
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
 
 
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user
 
 
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
 
 
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
 
 
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
 
 
@app.post("/users/", response_model=User)
async def create_user(user: UserCreate):
    if user.username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(user.password)
    user_dict = user.model_dump()
    del user_dict["password"]
    user_dict["hashed_password"] = hashed_password
    fake_users_db[user.username] = user_dict
    return User(**user_dict)
 
@app.get("/")
async def read_db():
    return fake_users_db
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
 
# JWT 설정
SECRET_KEY = "your-secret-key"  # 실제 환경에서는 안전한 방식으로 관리해야 합니다
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
 
app = FastAPI()
 
# 비밀번호 암호화 설정
# 아래 코드는 비밀번호를 해싱하는 방법을 정의합니다.
# pwd_context는 비밀번호를 검증하고 해싱하는 데 사용됩니다.
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
# OAuth2 설정
# OAuth2PasswordBearer는 토큰을 받아서 사용자를 인증하는 방법을 정의합니다.
# 아래 코드는 OAuth2PasswordBearer를 사용하여 토큰을 받아서 사용자를 인증합니다.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
fake_users_db = {}
 
 
class UserCreate(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
    password: str
 
 
class User(BaseModel):
    username: str
    email: EmailStr | None = None
    full_name: str | None = None
 
 
class UserInDB(User):
    hashed_password: str
 
 
class Token(BaseModel):
    access_token: str
    token_type: str
 
 
class TokenData(BaseModel):
    username: str | None = None
 
 
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
 
 
def get_password_hash(password):
    return pwd_context.hash(password)
 
 
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
 
 
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user
 
 
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
 
 
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
 
 
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
 
 
@app.post("/users/", response_model=User)
async def create_user(user: UserCreate):
    if user.username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(user.password)
    user_dict = user.model_dump()
    del user_dict["password"]
    user_dict["hashed_password"] = hashed_password
    fake_users_db[user.username] = user_dict
    return User(**user_dict)
 
@app.get("/")
async def read_db():
    return fake_users_db

5. 애플리케이션 실행

터미널에서 다음 명령어를 실행하여 애플리케이션을 시작합니다:

uvicorn main:app --reload
uvicorn main:app --reload

6. API 테스트

6.1 새 사용자 생성
  • URL: http://localhost:8000/users/
  • Method: POST
  • Body:
    {
      "username": "testuser",
      "email": "testuser@example.com",
      "full_name": "Test User",
      "password": "secretpassword"
    }
    {
      "username": "testuser",
      "email": "testuser@example.com",
      "full_name": "Test User",
      "password": "secretpassword"
    }
6.2 로그인 및 토큰 발급
  • URL: http://localhost:8000/token
  • Method: POST
  • Body (form-data):
    • username: testuser
    • password: secretpassword
6.3 사용자 정보 조회
  • URL: http://localhost:8000/users/me/
  • Method: GET
  • Headers:
    • Authorization: Bearer {your_access_token}

연습 문제

  1. 리프레시 토큰을 추가하여 토큰 갱신 기능을 구현해보세요.
  2. DB를 사용하여 사용자 정보를 저장하도록 코드를 수정해보세요.
4.2 JWT 토큰이란5장 FastAPI 실전 활용