인증 구현하기
1. 인증 구현
앞서 배운 JWT를 사용하여 사용자 인증을 구현해보겠습니다. 사용자는 회원가입을 통해 계정을 생성하고, 로그인을 통해 토큰을 발급받습니다. 이 토큰을 통해 사용자는 인증된 요청을 보낼 수 있습니다.
이러한 인증을 구현해놓으면 기존에 구현된 API도 수정을 해야 합니다. 예를 들어, '수정'의 경우에는 로그인 한 사용자 중에서도 자신이 작성한 글만 수정할 수 있어야 합니다. '삭제'도 마찬가지입니다. 저장 방식 또한 고민해볼 필요가 있습니다. 여러분이 숨긴 게시물을 보거나, 채팅 기록을 보거나, 비밀번호를 확인할 수 있어서도 안될 것입니다.
이런 요구사항을 모두 구현한 실제 운영되는 블로그 서비스를 만들기 위해서는 많은 고민과 노력이 필요합니다. 또한 합당한 라이브러리를 사용하고 있는지 냉정하게 살펴볼 필요가 있습니다. 이러한 서비스를 만들기 위해 필요한 기술은 Django와 같은 프레임웤에는 이미 구현되어 있기 때문입니다. 많이 하는 실수 중 하나가 '배운 기술이'라서 기술을 선택하는 것입니다.
이 챕터에서는 간단한 블로그에 인증을 구현해보겠습니다. 우선은 코드를 조각내여 각각의 기능을 확인하고, 마지막에 하나로 합쳐보겠습니다.
1.1 필요한 모듈 설치
JWT 토큰 생성과 검증을 위해 python-jose
를, 비밀번호 해싱을 위해 passlib
를 설치합니다.
pip install python-jose[cryptography] passlib[bcrypt] python-multipart
pip install python-jose[cryptography] passlib[bcrypt] python-multipart
- python-jose[cryptography]: JWT 토큰 생성 및 검증
- passlib[bcrypt]: 비밀번호 해싱
- python-multipart: 파일 업로드를 위한 멀티파트 데이터 처리
만약 이 챕터로 바로 들어왔다면 아래 명령어로 설치해주세요.
pip install fastapi uvicorn sqlalchemy pydantic python-jose[cryptography] passlib[bcrypt] python-multipart
pip install fastapi uvicorn sqlalchemy pydantic python-jose[cryptography] passlib[bcrypt] python-multipart
1.2 데이터베이스 모델 수정
사용자 정보를 저장할 User 모델을 추가합니다.
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, nullable=False)
hashed_password = Column(String, nullable=False)
created_at = Column(String, nullable=False)
class BlogModel(Base):
__tablename__ = "blogs"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
content = Column(String, nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(String, nullable=False)
updated_at = Column(String, nullable=False)
author = relationship("UserModel", back_populates="blogs")
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, nullable=False)
hashed_password = Column(String, nullable=False)
created_at = Column(String, nullable=False)
class BlogModel(Base):
__tablename__ = "blogs"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
content = Column(String, nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(String, nullable=False)
updated_at = Column(String, nullable=False)
author = relationship("UserModel", back_populates="blogs")
1.3 인증 관련 유틸리티 함수
비밀번호 해싱과 JWT 토큰 생성/검증을 위한 함수들을 구현합니다.
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
# 비밀번호 해싱을 위한 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT 설정
SECRET_KEY = "your-secret-key" # 실제 운영환경에서는 환경변수로 관리해야 합니다
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
# 비밀번호 해싱을 위한 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT 설정
SECRET_KEY = "your-secret-key" # 실제 운영환경에서는 환경변수로 관리해야 합니다
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
1.4 FastAPI 엔드포인트 구현
회원가입과 로그인을 위한 엔드포인트를 구현합니다.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Pydantic 모델
class UserCreate(BaseModel):
email: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
id: int
email: str
created_at: str
class Config:
from_attributes = True
# 현재 인증된 사용자 가져오기
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = verify_token(token)
if payload is None:
raise credentials_exception
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if user is None:
raise credentials_exception
return user
# 회원가입
@app.post("/register", response_model=User)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
# 이메일 중복 체크
db_user = db.query(UserModel).filter(UserModel.email == user_data.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 비밀번호 해싱
hashed_password = get_password_hash(user_data.password)
# 사용자 생성
db_user = UserModel(
email=user_data.email,
hashed_password=hashed_password,
created_at=datetime.now().strftime("%Y-%m-%d")
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 로그인
@app.post("/token", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.id)}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Pydantic 모델
class UserCreate(BaseModel):
email: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
id: int
email: str
created_at: str
class Config:
from_attributes = True
# 현재 인증된 사용자 가져오기
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = verify_token(token)
if payload is None:
raise credentials_exception
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if user is None:
raise credentials_exception
return user
# 회원가입
@app.post("/register", response_model=User)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
# 이메일 중복 체크
db_user = db.query(UserModel).filter(UserModel.email == user_data.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 비밀번호 해싱
hashed_password = get_password_hash(user_data.password)
# 사용자 생성
db_user = UserModel(
email=user_data.email,
hashed_password=hashed_password,
created_at=datetime.now().strftime("%Y-%m-%d")
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 로그인
@app.post("/token", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.id)}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
1.5 블로그 API에 인증 적용
기존 블로그 API에 인증을 적용합니다.
# 블로그 글 생성 (인증 필요)
@app.post("/blogs", response_model=Blog)
def create_blog(
blog_create_data: BlogCreate,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db)
):
now = datetime.now().strftime("%Y-%m-%d")
new_blog = BlogModel(
title=blog_create_data.title,
content=blog_create_data.content,
author_id=current_user.id,
created_at=now,
updated_at=now,
)
db.add(new_blog)
db.commit()
db.refresh(new_blog)
return new_blog
# 블로그 글 수정 (인증 필요)
@app.put("/blogs/{blog_id}", response_model=Blog)
def update_blog(
blog_id: int,
blog_update_data: BlogUpdate,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db)
):
blog = db.query(BlogModel).filter(BlogModel.id == blog_id).first()
if not blog:
raise HTTPException(status_code=404, detail="Blog not found")
if blog.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to update this blog")
blog.title = blog_update_data.title
blog.content = blog_update_data.content
blog.updated_at = datetime.now().strftime("%Y-%m-%d")
db.commit()
db.refresh(blog)
return blog
# 블로그 글 삭제 (인증 필요)
@app.delete("/blogs/{blog_id}")
def delete_blog(
blog_id: int,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db)
):
blog = db.query(BlogModel).filter(BlogModel.id == blog_id).first()
if not blog:
raise HTTPException(status_code=404, detail="Blog not found")
if blog.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to delete this blog")
db.delete(blog)
db.commit()
return {"message": "Blog deleted successfully"}
# 블로그 글 생성 (인증 필요)
@app.post("/blogs", response_model=Blog)
def create_blog(
blog_create_data: BlogCreate,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db)
):
now = datetime.now().strftime("%Y-%m-%d")
new_blog = BlogModel(
title=blog_create_data.title,
content=blog_create_data.content,
author_id=current_user.id,
created_at=now,
updated_at=now,
)
db.add(new_blog)
db.commit()
db.refresh(new_blog)
return new_blog
# 블로그 글 수정 (인증 필요)
@app.put("/blogs/{blog_id}", response_model=Blog)
def update_blog(
blog_id: int,
blog_update_data: BlogUpdate,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db)
):
blog = db.query(BlogModel).filter(BlogModel.id == blog_id).first()
if not blog:
raise HTTPException(status_code=404, detail="Blog not found")
if blog.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to update this blog")
blog.title = blog_update_data.title
blog.content = blog_update_data.content
blog.updated_at = datetime.now().strftime("%Y-%m-%d")
db.commit()
db.refresh(blog)
return blog
# 블로그 글 삭제 (인증 필요)
@app.delete("/blogs/{blog_id}")
def delete_blog(
blog_id: int,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db)
):
blog = db.query(BlogModel).filter(BlogModel.id == blog_id).first()
if not blog:
raise HTTPException(status_code=404, detail="Blog not found")
if blog.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to delete this blog")
db.delete(blog)
db.commit()
return {"message": "Blog deleted successfully"}
2. 프론트엔드 수정
인증 기능을 사용하기 위해 프론트엔드 코드도 수정해야 합니다. 로컬 스토리지에 토큰을 저장하고, API 요청 시 이 토큰을 함께 전송합니다.
2.1 로그인 페이지
static/login.html
파일을 생성하고 다음과 같이 작성합니다.
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>로그인</title>
</head>
<body>
<h1>로그인</h1>
<form id="loginForm">
<input type="email" name="email" placeholder="이메일" required>
<input type="password" name="password" placeholder="비밀번호" required>
<button type="submit">로그인</button>
</form>
<a href="/static/register.html">회원가입</a>
<script>
const form = document.getElementById('loginForm');
form.addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData();
formData.append('username', e.target.email.value);
formData.append('password', e.target.password.value);
try {
const response = await fetch('http://localhost:8000/token', {
method: 'POST',
body: formData
});
const data = await response.json();
if (response.ok) {
localStorage.setItem('token', data.access_token);
window.location.href = '/static/blog_list.html';
} else {
alert('로그인 실패');
}
} catch (error) {
console.error('Error:', error);
alert('로그인 중 오류가 발생했습니다.');
}
});
</script>
</body>
</html>
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>로그인</title>
</head>
<body>
<h1>로그인</h1>
<form id="loginForm">
<input type="email" name="email" placeholder="이메일" required>
<input type="password" name="password" placeholder="비밀번호" required>
<button type="submit">로그인</button>
</form>
<a href="/static/register.html">회원가입</a>
<script>
const form = document.getElementById('loginForm');
form.addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData();
formData.append('username', e.target.email.value);
formData.append('password', e.target.password.value);
try {
const response = await fetch('http://localhost:8000/token', {
method: 'POST',
body: formData
});
const data = await response.json();
if (response.ok) {
localStorage.setItem('token', data.access_token);
window.location.href = '/static/blog_list.html';
} else {
alert('로그인 실패');
}
} catch (error) {
console.error('Error:', error);
alert('로그인 중 오류가 발생했습니다.');
}
});
</script>
</body>
</html>
2.2 회원가입 페이지
static/register.html
파일을 생성하고 다음과 같이 작성합니다.
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>회원가입</title>
</head>
<body>
<h1>회원가입</h1>
<form id="registerForm">
<input type="email" name="email" placeholder="이메일" required>
<input type="password" name="password" placeholder="비밀번호" required>
<button type="submit">회원가입</button>
</form>
<a href="/static/login.html">로그인으로 돌아가기</a>
<script>
const form = document.getElementById('registerForm');
form.addEventListener('submit', async (e) => {
e.preventDefault();
try {
const response = await fetch('http://localhost:8000/register', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
email: e.target.email.value,
password: e.target.password.value
})
});
if (response.ok) {
alert('회원가입 성공');
window.location.href = '/static/login.html';
} else {
const data = await response.json();
alert(`회원가입 실패: ${data.detail}`);
}
} catch (error) {
console.error('Error:', error);
alert('회원가입 중 오류가 발생했습니다.');
}
});
</script>
</body>
</html>
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>회원가입</title>
</head>
<body>
<h1>회원가입</h1>
<form id="registerForm">
<input type="email" name="email" placeholder="이메일" required>
<input type="password" name="password" placeholder="비밀번호" required>
<button type="submit">회원가입</button>
</form>
<a href="/static/login.html">로그인으로 돌아가기</a>
<script>
const form = document.getElementById('registerForm');
form.addEventListener('submit', async (e) => {
e.preventDefault();
try {
const response = await fetch('http://localhost:8000/register', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
email: e.target.email.value,
password: e.target.password.value
})
});
if (response.ok) {
alert('회원가입 성공');
window.location.href = '/static/login.html';
} else {
const data = await response.json();
alert(`회원가입 실패: ${data.detail}`);
}
} catch (error) {
console.error('Error:', error);
alert('회원가입 중 오류가 발생했습니다.');
}
});
</script>
</body>
</html>
이제 모든 API 요청에 인증 토큰을 포함시켜야 합니다. 예를 들어, 블로그 글 작성 시에는 다음과 같이 헤더에 토큰을 포함시킵니다.
const token = localStorage.getItem('token');
if (!token) {
window.location.href = '/static/login.html';
return;
}
const response = await fetch('http://localhost:8000/blogs', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
title: title,
content: content
})
});
const token = localStorage.getItem('token');
if (!token) {
window.location.href = '/static/login.html';
return;
}
const response = await fetch('http://localhost:8000/blogs', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
title: title,
content: content
})
});
이렇게 하면 인증된 사용자만 블로그 글을 작성, 수정, 삭제할 수 있게 됩니다. 또한 각 사용자는 자신이 작성한 글만 수정하고 삭제할 수 있습니다.
3. 전체 소스코드
전체 소스코드는 아래 GitHub에서 확인하실 수 있습니다.
fast api 소스코드